Refactor timeline framework to properly enforce concurrency-safe observations, and fix various issues discovered in the implementation of expectation algebra.

Refactor pattern to use == rather than 'in'.

Improve timeline logging and timestamp everything that's logged.

Do not run checks in fixture finalization if test failed.
This commit is contained in:
Pavel Minaev 2018-10-05 06:21:40 -07:00 committed by Karthik Nadig
parent 8c3269543a
commit a204460039
11 changed files with 765 additions and 385 deletions

View file

@ -135,6 +135,14 @@ class RequestFailure(Exception):
def __init__(self, message):
self.message = message
def __eq__(self, other):
if isinstance(other, RequestFailure) and other.message == self.message:
return True
return NotImplemented
def __ne__(self, other):
return not self == other
class Request(object):
"""Represents a request that was sent to the other party, and is awaiting or has

View file

@ -1,3 +1,3 @@
[pytest]
testpaths=pytests
timeout=5
timeout=15

View file

@ -9,11 +9,34 @@ import pytest
import threading
import types
from . import helpers
from .helpers import print
from .helpers.session import DebugSession
@pytest.hookimpl(hookwrapper=True, tryfirst=True)
def pytest_runtest_makereport(item, call):
# Adds attributes such as setup_result, call_result etc to the item after the
# corresponding scope finished running its tests. This can be used in function-level
# fixtures to detect failures, e.g.:
#
# if request.node.call_result.failed: ...
outcome = yield
result = outcome.get_result()
setattr(item, result.when + '_result', result)
@pytest.hookimpl(hookwrapper=True, tryfirst=True)
def pytest_pyfunc_call(pyfuncitem):
# Resets the timestamp zero for every new test.
print('Timestamp clock reset to zero.', timestamped=False)
helpers.timestamp_zero = helpers.timestamp()
yield
@pytest.fixture
def daemon():
def daemon(request):
"""Provides a factory function for daemon threads. The returned thread is
started immediately, and it must not be alive by the time the test returns.
"""
@ -30,8 +53,14 @@ def daemon():
yield factory
for thread in daemons:
assert not thread.is_alive()
try:
failed = request.node.call_result.failed
except AttributeError:
pass
else:
if not failed:
for thread in daemons:
assert not thread.is_alive()
@pytest.fixture
@ -101,6 +130,12 @@ def debug_session(request):
session = DebugSession(request.param)
yield session
try:
session.wait_for_exit()
try:
failed = request.node.call_result.failed
except AttributeError:
pass
else:
if not failed:
session.wait_for_exit()
finally:
session.stop()

View file

@ -9,16 +9,18 @@ import pytest
from ..helpers.pattern import ANY, Pattern
from ..helpers.session import DebugSession
from ..helpers.timeline import Event
from ..helpers.timeline import Event, Request
@pytest.mark.timeout(20)
@pytest.mark.timeout(40)
@pytest.mark.skipif(platform.system() != 'Windows',
reason='Debugging multiprocessing module only works on Windows')
def test_multiprocessing(debug_session, pyfile):
@pyfile
def code_to_debug():
import multiprocessing
import platform
import sys
def child_of_child(q):
print('entering child of child')
@ -38,7 +40,11 @@ def test_multiprocessing(debug_session, pyfile):
if __name__ == '__main__':
import pytests.helpers.backchannel as backchannel
multiprocessing.set_start_method('spawn')
if sys.version_info >= (3, 4):
multiprocessing.set_start_method('spawn')
else:
assert platform.system() == 'Windows'
q = multiprocessing.Queue()
p = multiprocessing.Process(target=child, args=(q,))
p.start()
@ -50,37 +56,53 @@ def test_multiprocessing(debug_session, pyfile):
assert q.get() == 4
q.close()
debug_session.setup_backchannel()
debug_session.multiprocess = True
debug_session.prepare_to_run(filename=code_to_debug)
debug_session.start_debugging()
debug_session.prepare_to_run(filename=code_to_debug, backchannel=True)
start = debug_session.start_debugging()
with debug_session.timeline.frozen():
initial_request, = debug_session.timeline.all_occurrences_of(Request('launch') | Request('attach'))
initial_process = (start >> Event('process')).wait()
initial_pid = int(initial_process.body['systemProcessId'])
child_pid = debug_session.backchannel.read_json()
child_subprocess = debug_session.wait_until(Event('ptvsd_subprocess'))
assert child_subprocess.body in Pattern({
child_subprocess = (start >> Event('ptvsd_subprocess')).wait()
assert child_subprocess.body == Pattern({
'initialProcessId': initial_pid,
'parentProcessId': initial_pid,
'processId': child_pid,
'port': ANY.int,
'initialRequest': {
'command': initial_request.command,
'arguments': initial_request.arguments,
}
})
child_port = child_subprocess.body['port']
child_session = DebugSession(method='attach_socket', ptvsd_port=child_port)
child_session.connect()
child_session.handshake()
child_session.start_debugging()
child_start = child_session.start_debugging()
child_child_subprocess = child_session.wait_until(Event('ptvsd_subprocess'))
assert child_child_subprocess.body in Pattern({
child_child_subprocess = (child_start >> Event('ptvsd_subprocess')).wait()
assert child_child_subprocess.body == Pattern({
'initialProcessId': initial_pid,
'parentProcessId': child_pid,
'processId': ANY.int,
'port': ANY.int,
'initialRequest': {
'command': initial_request.command,
'arguments': initial_request.arguments,
}
})
child_child_port = child_child_subprocess.body['port']
child_child_session = DebugSession(method='attach_socket', ptvsd_port=child_child_port)
child_child_session.connect()
child_child_session.handshake()
child_child_session.start_debugging()
child_child_session.wait_until(Event('process'))
child_child_start = child_child_session.start_debugging()
(child_child_start >> Event('process')).wait()
debug_session.backchannel.write_json('continue')

View file

@ -11,17 +11,20 @@ from ..helpers.timeline import Event
def test_run(debug_session, pyfile):
@pyfile
def code_to_debug():
print('waiting for input')
input()
print('got input!')
from pytests.helpers import backchannel
print('before')
assert backchannel.read_json() == 1
print('after')
debug_session.prepare_to_run(filename=code_to_debug)
debug_session.start_debugging()
timeline = debug_session.timeline
debug_session.prepare_to_run(filename=code_to_debug, backchannel=True)
start = debug_session.start_debugging()
t = debug_session.wait_until(Event('process') & Event('thread'))
assert (
Event('thread', {'reason': 'started', 'threadId': ANY})
& (
first_thread = (start >> Event('thread', {'reason': 'started', 'threadId': ANY})).wait()
with timeline.frozen():
assert (
timeline.beginning
>>
Event('initialized', {})
>>
Event('process', {
@ -30,10 +33,11 @@ def test_run(debug_session, pyfile):
'startMethod': 'launch' if debug_session.method == 'launch' else 'attach',
'systemProcessId': debug_session.process.pid,
})
)
).has_occurred_by(t)
>>
first_thread
) in timeline
with debug_session.causing(Event('terminated', {})):
debug_session.process.communicate(b'0\n')
t = debug_session.write_json(1)
(t >> Event('terminated', {})).wait()
debug_session.wait_for_exit()

View file

@ -11,13 +11,29 @@ import time
import traceback
if sys.version_info >= (3, 3):
clock = time.perf_counter
else:
clock = time.clock
timestamp_zero = clock()
def timestamp():
return clock() - timestamp_zero
print_lock = threading.Lock()
real_print = print
def print(*args, **kwargs):
"""Like builtin print(), but synchronized using a global lock.
"""Like builtin print(), but synchronized using a global lock,
and adds a timestamp
"""
timestamped = kwargs.pop('timestamped', True)
with print_lock:
if timestamped:
real_print('@%09.6f: ' % timestamp(), end='')
real_print(*args, **kwargs)

View file

@ -14,6 +14,7 @@ import socket
from ptvsd.messaging import JsonIOStream
port = int(os.getenv('PTVSD_BACKCHANNEL_PORT'))
# print('Connecting to bchan#%d' % port)
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect(('localhost', port))
stream = JsonIOStream.from_socket(sock)

View file

@ -116,12 +116,14 @@ class Maybe(BasePattern):
"""A pattern that matches if condition is True.
"""
name = None
def __init__(self, pattern, condition):
self.pattern = pattern
self.condition = condition
def __repr__(self):
return 'Maybe(%r)' % self.pattern
return self.name or 'Maybe(%r)' % self.pattern
def __eq__(self, value):
return self.condition(value) and value == self.pattern
@ -159,7 +161,15 @@ SUCCESS = Success(True)
FAILURE = Success(False)
ANY = Any()
ANY.bool = ANY.such_that(lambda x: x is True or x is False)
ANY.bool.name = 'ANY.bool'
ANY.str = ANY.such_that(lambda x: isinstance(x, unicode))
ANY.str.name = 'ANY.str'
ANY.num = ANY.such_that(lambda x: isinstance(x, numbers.Real))
ANY.num.name = 'ANY.num'
ANY.int = ANY.such_that(lambda x: isinstance(x, numbers.Integral))
ANY.int.name = 'ANY.int'

View file

@ -4,7 +4,6 @@
from __future__ import print_function, with_statement, absolute_import
import contextlib
import os
import socket
import subprocess
@ -16,7 +15,8 @@ import ptvsd
from ptvsd.messaging import JsonIOStream, JsonMessageChannel, MessageHandlers, RequestFailure
from . import print, watchdog
from .messaging import LoggingJsonStream
from .timeline import Timeline, Request, Response, Event
from .pattern import Pattern
from .timeline import Timeline, Request, Event
# ptvsd.__file__ will be <dir>/ptvsd/__main__.py - we want <dir>.
@ -70,10 +70,12 @@ class DebugSession(object):
except:
self.backchannel_socket = None
def prepare_to_run(self, perform_handshake=True, filename=None, module=None):
def prepare_to_run(self, perform_handshake=True, filename=None, module=None, backchannel=False):
"""Spawns ptvsd using the configured method, telling it to execute the
provided Python file or module, and establishes a message channel to it.
If backchannel is True, calls self.setup_backchannel() before returning.
If perform_handshake is True, calls self.handshake() before returning.
"""
@ -102,6 +104,9 @@ class DebugSession(object):
env = os.environ.copy()
env.update({'PYTHONPATH': PTVSD_SYS_PATH})
if backchannel:
self.setup_backchannel()
if self.backchannel_port:
env['PTVSD_BACKCHANNEL_PORT'] = str(self.backchannel_port)
@ -115,10 +120,16 @@ class DebugSession(object):
self.connected.wait()
assert self.ptvsd_port
assert self.socket
print('ptvsd@%d has pid=%d' % (self.ptvsd_port, self.process.pid))
print('ptvsd#%d has pid=%d' % (self.ptvsd_port, self.process.pid))
self.timeline.beginning.await_following(Event('output', Pattern({
'category': 'telemetry',
'output': 'ptvsd',
'data': {'version': ptvsd.__version__}
})))
if perform_handshake:
self.handshake()
return self.handshake()
def wait_for_exit(self, expected_returncode=0):
"""Waits for the spawned ptvsd process to exit. If it doesn't exit within
@ -128,10 +139,10 @@ class DebugSession(object):
def kill():
time.sleep(self.WAIT_FOR_EXIT_TIMEOUT)
print('ptvsd process %d timed out, killing it' % self.process.pid)
print('ptvsd#%r (pid=%d) timed out, killing it' % (self.ptvsd_port, self.process.pid))
if self.is_running:
self.process.kill()
kill_thread = threading.Thread(target=kill)
kill_thread = threading.Thread(target=kill, name='ptvsd#%r watchdog (pid=%d)' % (self.ptvsd_port, self.process.pid))
kill_thread.daemon = True
kill_thread.start()
@ -151,12 +162,12 @@ class DebugSession(object):
self.server_socket.listen(0)
def accept_worker():
print('Listening for incoming connection from ptvsd@%d' % self.ptvsd_port)
print('Listening for incoming connection from ptvsd#%d' % self.ptvsd_port)
self.socket, _ = self.server_socket.accept()
print('Incoming ptvsd@%d connection accepted' % self.ptvsd_port)
print('Incoming ptvsd#%d connection accepted' % self.ptvsd_port)
self._setup_channel()
accept_thread = threading.Thread(target=accept_worker)
accept_thread = threading.Thread(target=accept_worker, name='ptvsd#%d listener' % self.ptvsd_port)
accept_thread.daemon = True
accept_thread.start()
@ -171,15 +182,15 @@ class DebugSession(object):
time.sleep(0.1)
def _try_connect(self):
print('Trying to connect to ptvsd@%d' % self.ptvsd_port)
print('Trying to connect to ptvsd#%d' % self.ptvsd_port)
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect(('localhost', self.ptvsd_port))
print('Successfully connected to ptvsd@%d' % self.ptvsd_port)
print('Successfully connected to ptvsd#%d' % self.ptvsd_port)
self.socket = sock
self._setup_channel()
def _setup_channel(self):
self.stream = LoggingJsonStream(JsonIOStream.from_socket(self.socket), 'ptvsd@%d' % self.ptvsd_port)
self.stream = LoggingJsonStream(JsonIOStream.from_socket(self.socket), 'ptvsd#%d' % self.ptvsd_port)
handlers = MessageHandlers(request=self._process_request, event=self._process_event)
self.channel = JsonMessageChannel(self.stream, handlers)
self.channel.start()
@ -189,20 +200,13 @@ class DebugSession(object):
request = self.timeline.record_request(command, arguments)
request.sent = self.channel.send_request(command, arguments)
request.sent.on_response(lambda response: self._process_response(request, response))
assert Request(command, arguments) in self.timeline
request.causing = lambda expectation: request.await_following(expectation) and request
assert Request(command, arguments).is_realized_by(request)
return request
def mark(self, id):
return self.timeline.mark(id)
@contextlib.contextmanager
def causing(self, expectation):
assert expectation not in self.timeline
promised_occurrence = ['ONLY VALID AFTER END OF BLOCK!']
yield promised_occurrence
occ = self.wait_until(expectation)
promised_occurrence[:] = (occ,)
def handshake(self):
"""Performs the handshake that establishes the debug session ('initialized'
and 'launch' or 'attach').
@ -213,8 +217,9 @@ class DebugSession(object):
to finalize the configuration stage, and start running code.
"""
with self.causing(Event('initialized', {})):
self.send_request('initialize', {'adapterID': 'test'}).wait_for_response()
(self.send_request('initialize', {'adapterID': 'test'})
.causing(Event('initialized', {}))
.wait_for_response())
request = 'launch' if self.method == 'launch' else 'attach'
self.send_request(request, {'debugOptions': self.debug_options}).wait_for_response()
@ -226,7 +231,7 @@ class DebugSession(object):
After this method returns, ptvsd is running the code in the script file or module
that was specified in prepare_to_run().
"""
self.send_request('configurationDone').wait_for_response()
return self.send_request('configurationDone').wait_for_response()
def _process_event(self, channel, event, body):
self.timeline.record_event(event, body)
@ -236,30 +241,24 @@ class DebugSession(object):
def _process_response(self, request, response):
body = response.body if response.success else RequestFailure(response.error_message)
self.timeline.record_response(request, body)
assert Response(request, body) in self.timeline
def _process_request(self, channel, command, arguments):
assert False, 'ptvsd should not be sending requests.'
def wait_until(self, expectation):
return self.timeline.wait_until(expectation)
def history(self):
return self.timeline.history
def setup_backchannel(self):
assert self.process is None, 'setup_backchannel() must be called before prepare_to_run()'
self.backchannel_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.backchannel_socket.bind(('localhost', 0))
_, self.backchannel_port = self.backchannel_socket.getsockname()
self.backchannel_socket.listen(0)
backchannel_thread = threading.Thread(target=self._backchannel_worker)
backchannel_thread = threading.Thread(target=self._backchannel_worker, name='ptvsd#%d backchannel' % self.ptvsd_port)
backchannel_thread.daemon = True
backchannel_thread.start()
def _backchannel_worker(self):
print('Listening for incoming backchannel connection for bchan#%d' % self.ptvsd_port)
sock, _ = self.backchannel_socket.accept()
self._backchannel_stream = LoggingJsonStream(JsonIOStream.from_socket(sock), 'bchan@%d' % self.ptvsd_port)
print('Incoming bchan#%d backchannel connection accepted' % self.ptvsd_port)
self._backchannel_stream = LoggingJsonStream(JsonIOStream.from_socket(sock), 'bchan#%d' % self.ptvsd_port)
self.backchannel_established.set()
@property
@ -267,3 +266,11 @@ class DebugSession(object):
assert self.backchannel_port, 'backchannel() must be called after setup_backchannel()'
self.backchannel_established.wait()
return self._backchannel_stream
def read_json(self):
return self.backchannel.read_json()
def write_json(self, value):
t = self.timeline.mark(('sending', value))
self.backchannel.write_json(value)
return t

View file

@ -10,7 +10,7 @@ import time
from ptvsd.messaging import RequestFailure
from .pattern import Pattern, ANY, SUCCESS, FAILURE
from .pattern import Pattern, ANY, SUCCESS, FAILURE, Is
from .timeline import Timeline, Mark, Event, Request, Response
@ -25,87 +25,111 @@ def make_timeline():
def factory():
timeline = Timeline()
assert Mark(Pattern('begin')) in timeline
assert timeline.history() in Pattern([('Mark', 'begin',)])
timelines.append(timeline)
return timeline
yield factory
for timeline in timelines:
timeline.freeze()
history = list(timeline.history())
timeline.finalize()
history = timeline.history()
history.sort(key=lambda occ: occ.timestamp)
assert history == timeline.history()
def history_data(timeline):
return [occ.__data__() for occ in timeline.history()]
with timeline.frozen():
return [occ.__data__() for occ in timeline.history()]
def test_simple_1thread(make_timeline):
timeline = make_timeline()
expected_history = history_data(timeline)
expectations = []
with timeline.frozen():
assert timeline.all_occurrences_of(Mark('tada')) == ()
assert not Mark('tada').has_been_realized_in(timeline)
mark = timeline.mark('tada')
expected_history += [('Mark', 'tada')]
assert mark.circumstances == ('Mark', 'tada')
assert mark.id == 'tada'
expectations += [Mark('tada')]
assert expectations in timeline
expected_history += [('Mark', 'tada')]
assert timeline.history() in Pattern(expected_history)
with timeline.frozen():
assert timeline.history() == Pattern(expected_history)
assert timeline.all_occurrences_of(Mark('tada')) == Pattern((Is(mark),))
assert timeline.last is mark
assert Mark('tada').has_been_realized_in(timeline)
request = timeline.record_request('next', {'threadId': 3})
expected_history += [('Request', 'next', {'threadId': 3})]
assert request.circumstances == ('Request', 'next', {'threadId': 3})
assert request.command == 'next'
assert request.arguments == {'threadId': 3}
expectations += [Request('next', {'threadId': 3})]
assert expectations in timeline
expected_history += [('Request', 'next', {'threadId': 3})]
assert timeline.history() in Pattern(expected_history)
with timeline.frozen():
assert timeline.history() == Pattern(expected_history)
assert timeline.last is request
assert Request('next', {'threadId': 3}).has_been_realized_in(timeline)
assert timeline.all_occurrences_of(Request('next', {'threadId': 3})) == Pattern((Is(request),))
response = timeline.record_response(request, {})
assert response.circumstances == ('Response', request, {})
expected_history += [('Response', Is(request), {})]
assert response.circumstances == Pattern(('Response', Is(request), {}))
assert response.request is request
assert response.body == {}
assert response.success
expectations += [
expectations = [
Response(request, {}),
Response(request, SUCCESS),
Response(request),
Response(Request('next', {'threadId': 3}), {}),
Response(Request('next', {'threadId': 3}), SUCCESS),
Response(Request('next', {'threadId': 3})),
]
assert expectations in timeline
expected_history += [('Response', request, {})]
assert timeline.history() in Pattern(expected_history)
with timeline.frozen():
assert timeline.history() == Pattern(expected_history)
assert timeline.last is response
for exp in expectations:
print(exp)
assert exp.has_been_realized_in(timeline)
print(timeline.all_occurrences_of(exp))
assert timeline.all_occurrences_of(exp) == Pattern((Is(response),))
event = timeline.record_event('stopped', {'reason': 'pause'})
assert event.circumstances == ('Event', 'stopped', {'reason': 'pause'})
expectations += [Event('stopped', {'reason': 'pause'})]
assert expectations in timeline
expected_history += [('Event', 'stopped', {'reason': 'pause'})]
assert timeline.history() in Pattern(expected_history)
request = timeline.record_request('next', {'threadId': 6})
assert event.circumstances == ('Event', 'stopped', {'reason': 'pause'})
with timeline.frozen():
assert timeline.last is event
assert timeline.history() == Pattern(expected_history)
assert Event('stopped', {'reason': 'pause'}).has_been_realized_in(timeline)
assert timeline.all_occurrences_of(Event('stopped', {'reason': 'pause'})) == Pattern((Is(event),))
request2 = timeline.record_request('next', {'threadId': 6})
expected_history += [('Request', 'next', {'threadId': 6})]
response2 = timeline.record_response(request2, RequestFailure('error!'))
expected_history += [('Response', Is(request2), FAILURE)]
response = timeline.record_response(request, RequestFailure('error!'))
assert response.circumstances in Pattern((
'Response',
request,
ANY.such_that(lambda err: isinstance(err, RequestFailure) and err.message == 'error!')
))
assert response.request is request
assert isinstance(response.body, RequestFailure) and response.body.message == 'error!'
assert not response.success
expectations += [
Response(request, FAILURE),
assert response2.circumstances == Pattern(('Response', Is(request2), FAILURE))
assert response2.request is request2
assert isinstance(response2.body, RequestFailure) and response2.body.message == 'error!'
assert not response2.success
expectations = [
Response(request2, RequestFailure('error!')),
Response(request2, FAILURE),
Response(request2),
Response(Request('next', {'threadId': 6}), RequestFailure('error!')),
Response(Request('next', {'threadId': 6}), FAILURE),
Response(Request('next', {'threadId': 6})),
]
assert expectations in timeline
expected_history += [('Response', request, FAILURE)]
assert timeline.history() in Pattern(expected_history)
with timeline.frozen():
for exp in expectations:
print(exp)
assert exp.has_been_realized_in(timeline)
assert timeline.all_occurrences_of(exp) == Pattern((Is(response2),))
assert timeline.all_occurrences_of(Response(Request('next'))) == Pattern((Is(response), Is(response2)))
@pytest.mark.parametrize('occurs_before_wait', (True, False))
@ -146,34 +170,93 @@ def test_simple_mthread(make_timeline, daemon, occurs_before_wait):
thev.set()
threading.Thread(target=set_later).start()
t = timeline.beginning
advance_worker()
expectation = Mark('tada')
timeline.wait_until(expectation)
assert expectation in timeline
expected_history += [('Mark', 'tada')]
assert timeline.history() in Pattern(expected_history)
t = t.await_following(expectation)
with timeline.frozen():
assert expectation.has_been_realized_in(timeline)
assert timeline.history() == Pattern(expected_history)
advance_worker()
expectation = Request('next', {'threadId': 3})
timeline.wait_until(expectation)
assert expectation in timeline
expected_history += [('Request', 'next', {'threadId': 3})]
assert timeline.history() in Pattern(expected_history)
request = occurrences[-1]
expectation = Request('next', {'threadId': 3})
t = t.await_following(expectation)
with timeline.frozen():
assert expectation.has_been_realized_in(timeline)
assert timeline.history() == Pattern(expected_history)
request = occurrences[-1]
assert request is t
advance_worker()
expectation = Response(request, {}) & Response(Request('next', {'threadId': 3}), {})
timeline.wait_until(expectation)
assert expectation in timeline
expected_history += [('Response', request, {})]
assert timeline.history() in Pattern(expected_history)
expectation = Response(request, {}) & Response(Request('next', {'threadId': 3}), {})
t = t.await_following(expectation)
with timeline.frozen():
assert expectation.has_been_realized_in(timeline)
assert timeline.history() == Pattern(expected_history)
advance_worker()
expectation = Event('stopped', {'reason': 'pause'})
timeline.wait_until(expectation)
assert expectation in timeline
expected_history += [('Event', 'stopped', {'reason': 'pause'})]
assert timeline.history() in Pattern(expected_history)
expectation = Event('stopped', {'reason': 'pause'})
with timeline.frozen():
t = t.await_following(expectation)
assert expectation.has_been_realized_in(timeline)
assert timeline.history() == Pattern(expected_history)
def test_after(make_timeline):
timeline = make_timeline()
first = timeline.mark('first')
second_exp = first >> Mark('second')
with timeline.frozen():
assert second_exp not in timeline
timeline.mark('second')
with timeline.frozen():
assert second_exp in timeline
def test_before(make_timeline):
timeline = make_timeline()
t = timeline.beginning
first = timeline.mark('first')
timeline.mark('second')
with timeline.frozen():
assert t >> Mark('second') >> Mark('first') not in timeline
assert Mark('second') >> first not in timeline
third = timeline.mark('third')
with timeline.frozen():
assert t >> Mark('second') >> Mark('first') not in timeline
assert Mark('second') >> first not in timeline
assert t >> Mark('second') >> Mark('third') in timeline
assert Mark('second') >> third in timeline
def test_not(make_timeline):
timeline = make_timeline()
timeline.mark('other')
with timeline.frozen():
assert timeline.beginning >> ~Mark('something') in timeline
t = timeline.last
timeline.mark('something')
with timeline.frozen():
assert timeline.beginning >> ~Mark('something') in timeline
assert t >> ~Mark('something') not in timeline
def test_and(make_timeline):
@ -182,24 +265,30 @@ def test_and(make_timeline):
cheese_exp = Mark('cheese')
timeline = make_timeline()
assert eggs_exp & ham_exp not in timeline
assert ham_exp & eggs_exp not in timeline
assert cheese_exp & ham_exp & eggs_exp not in timeline
t = timeline.beginning
with timeline.frozen():
assert t >> (eggs_exp & ham_exp) not in timeline
assert t >> (ham_exp & eggs_exp) not in timeline
assert t >> (cheese_exp & ham_exp & eggs_exp) not in timeline
timeline.mark('eggs')
assert eggs_exp & ham_exp not in timeline
assert ham_exp & eggs_exp not in timeline
assert cheese_exp & ham_exp & eggs_exp not in timeline
with timeline.frozen():
assert t >> (eggs_exp & ham_exp) not in timeline
assert t >> (ham_exp & eggs_exp) not in timeline
assert t >> (cheese_exp & ham_exp & eggs_exp) not in timeline
timeline.mark('ham')
assert eggs_exp & ham_exp in timeline
assert ham_exp & eggs_exp in timeline
assert cheese_exp & ham_exp & eggs_exp not in timeline
with timeline.frozen():
assert t >> (eggs_exp & ham_exp) in timeline
assert t >> (ham_exp & eggs_exp) in timeline
assert t >> (cheese_exp & ham_exp & eggs_exp) not in timeline
timeline.mark('cheese')
assert eggs_exp & ham_exp in timeline
assert ham_exp & eggs_exp in timeline
assert cheese_exp & ham_exp & eggs_exp in timeline
with timeline.frozen():
assert t >> (eggs_exp & ham_exp) in timeline
assert t >> (ham_exp & eggs_exp) in timeline
assert t >> (cheese_exp & ham_exp & eggs_exp) in timeline
def test_or(make_timeline):
@ -208,33 +297,37 @@ def test_or(make_timeline):
cheese_exp = Mark('cheese')
timeline = make_timeline()
assert eggs_exp | ham_exp not in timeline
assert ham_exp | eggs_exp not in timeline
assert cheese_exp | ham_exp | eggs_exp not in timeline
t = timeline.beginning
with timeline.frozen():
assert t >> (eggs_exp | ham_exp) not in timeline
assert t >> (ham_exp | eggs_exp) not in timeline
assert t >> (cheese_exp | ham_exp | eggs_exp) not in timeline
timeline.mark('eggs')
assert eggs_exp | ham_exp in timeline
assert ham_exp | eggs_exp in timeline
assert cheese_exp | ham_exp | eggs_exp in timeline
with timeline.frozen():
assert t >> (eggs_exp | ham_exp) in timeline
assert t >> (ham_exp | eggs_exp) in timeline
assert t >> (cheese_exp | ham_exp | eggs_exp) in timeline
timeline.mark('cheese')
assert eggs_exp | ham_exp in timeline
assert ham_exp | eggs_exp in timeline
assert cheese_exp | ham_exp | eggs_exp in timeline
timeline = make_timeline()
with timeline.frozen():
assert t >> (eggs_exp | ham_exp) in timeline
assert t >> (ham_exp | eggs_exp) in timeline
assert t >> (cheese_exp | ham_exp | eggs_exp) in timeline
timeline.mark('ham')
assert eggs_exp | ham_exp in timeline
assert ham_exp | eggs_exp in timeline
assert cheese_exp | ham_exp | eggs_exp in timeline
timeline = make_timeline()
with timeline.frozen():
assert t >> (eggs_exp | ham_exp) in timeline
assert t >> (ham_exp | eggs_exp) in timeline
assert t >> (cheese_exp | ham_exp | eggs_exp) in timeline
t = timeline.last
timeline.mark('cheese')
assert eggs_exp | ham_exp not in timeline
assert ham_exp | eggs_exp not in timeline
assert cheese_exp | ham_exp | eggs_exp in timeline
with timeline.frozen():
assert t >> (eggs_exp | ham_exp) not in timeline
assert t >> (ham_exp | eggs_exp) not in timeline
assert t >> (cheese_exp | ham_exp | eggs_exp) in timeline
def test_xor(make_timeline):
@ -243,60 +336,44 @@ def test_xor(make_timeline):
cheese_exp = Mark('cheese')
timeline = make_timeline()
assert eggs_exp ^ ham_exp not in timeline
assert ham_exp ^ eggs_exp not in timeline
assert cheese_exp ^ ham_exp ^ eggs_exp not in timeline
t1 = timeline.beginning
with timeline.frozen():
assert t1 >> (eggs_exp ^ ham_exp) not in timeline
assert t1 >> (ham_exp ^ eggs_exp) not in timeline
assert t1 >> (cheese_exp ^ ham_exp ^ eggs_exp) not in timeline
timeline.mark('eggs')
assert eggs_exp ^ ham_exp in timeline
assert ham_exp ^ eggs_exp in timeline
assert cheese_exp ^ ham_exp ^ eggs_exp in timeline
with timeline.frozen():
assert t1 >> (eggs_exp ^ ham_exp) in timeline
assert t1 >> (ham_exp ^ eggs_exp) in timeline
assert t1 >> (cheese_exp ^ ham_exp ^ eggs_exp) in timeline
t2 = timeline.last
timeline.mark('ham')
assert eggs_exp ^ ham_exp not in timeline
assert ham_exp ^ eggs_exp not in timeline
assert cheese_exp ^ ham_exp ^ eggs_exp not in timeline
with timeline.frozen():
assert t1 >> (eggs_exp ^ ham_exp) in timeline
assert t2 >> (eggs_exp ^ ham_exp) not in timeline
assert t1 >> (ham_exp ^ eggs_exp) in timeline
assert t2 >> (ham_exp ^ eggs_exp) not in timeline
assert t1 >> (cheese_exp ^ ham_exp ^ eggs_exp) in timeline
assert t2 >> (cheese_exp ^ ham_exp ^ eggs_exp) not in timeline
def test_conditional(make_timeline):
def is_exciting(occ):
return occ.circumstances in Pattern(('Event', ANY, 'exciting'))
return occ.circumstances == Pattern(('Event', ANY, 'exciting'))
something = Event('something', ANY)
something_exciting = something.when(is_exciting)
timeline = make_timeline()
t = timeline.beginning
timeline.record_event('something', 'boring')
assert something in timeline
assert something_exciting not in timeline
with timeline.frozen():
assert t >> something in timeline
assert t >> something_exciting not in timeline
timeline.record_event('something', 'exciting')
assert something_exciting in timeline
def test_after(make_timeline):
timeline = make_timeline()
first = timeline.mark('first')
second_exp = first >> Mark('second')
assert second_exp not in timeline
timeline.mark('second')
assert second_exp in timeline
def test_before(make_timeline):
timeline = make_timeline()
first = timeline.mark('first')
timeline.mark('second')
assert Mark('second') >> Mark('first') not in timeline
assert Mark('second') >> first not in timeline
third = timeline.mark('third')
assert Mark('second') >> Mark('first') not in timeline
assert Mark('second') >> first not in timeline
assert Mark('second') >> Mark('third') in timeline
assert Mark('second') >> third in timeline
with timeline.frozen():
assert t >> something_exciting in timeline

View file

@ -4,68 +4,132 @@
from __future__ import print_function, with_statement, absolute_import
import contextlib
import itertools
import threading
import time
# This is only imported to ensure that the module is actually installed and the
# timeout setting in pytest.ini is active, since otherwise most timeline-based
# tests will hang indefinitely.
import pytest_timeout # noqa
from pytests.helpers import print, timestamp
import pytests.helpers.pattern as pattern
class Timeline(object):
def __init__(self):
self._cvar = threading.Condition()
self._is_frozen = False
self.index_iter = itertools.count(1)
self._last = None
self._is_frozen = False
self._is_final = False
self.beginning = None # needed for mark() below
self.beginning = self.mark('begin')
def assert_frozen(self):
assert self.is_frozen, 'Timeline can only be inspected while frozen()'
@property
def last(self):
with self._cvar:
self.assert_frozen()
return self._last
def history(self):
result = list(self.last().backtrack())
result.reverse()
return result
self.assert_frozen()
return list(self.beginning.and_following())
def __contains__(self, expectations):
try:
iter(expectations)
except TypeError:
expectations = (expectations,)
assert all(isinstance(exp, Expectation) for exp in expectations)
last = self.last()
return all(exp.has_occurred_by(last) for exp in expectations)
def all_occurrences_of(self, expectation):
occs = [occ for occ in self.history() if occ.realizes(expectation)]
return tuple(occs)
def _record(self, occurrence):
assert isinstance(occurrence, Occurrence)
assert occurrence.timeline is self
assert occurrence.preceding is None
def __contains__(self, expectation):
assert expectation.has_lower_bound, (
'Expectation must have a lower time bound to be used with "in"! '
'Use >> to sequence an expectation against an occurrence to establish a lower bound, '
'or use has_been_realized_in() to test for unbounded expectations in the timeline.'
)
return expectation.has_been_realized_in(self)
def __getitem__(self, index):
assert index is slice
assert index.step is None
start = index.start or self.beginning
stop = index.stop
if stop is None:
assert self._is_frozen
stop = self._last
return self.Interval(self, start, stop)
def wait_until(self, condition):
with self._cvar:
assert not self._is_frozen
occurrence.timestamp = time.clock()
occurrence.preceding = self._last
self._last = occurrence
self._cvar.notify_all()
def freeze(self):
with self._cvar:
self._is_frozen = True
def wait_until(self, expectation):
assert isinstance(expectation, Expectation)
with self._cvar:
while expectation not in self:
while True:
with self.frozen():
if condition():
break
assert not self._is_final
self._cvar.wait()
return self._last
def wait_for(self, expectation):
assert expectation.has_lower_bound, (
'Expectation must have a lower time bound to be used with wait_for()!'
'Use >> to sequence an expectation against an occurrence to establish a lower bound.'
)
print('Waiting for %r' % expectation)
return self.wait_until(lambda: expectation in self)
def _record(self, occurrence):
t = timestamp()
assert isinstance(occurrence, Occurrence)
assert occurrence.timeline is self
assert occurrence.timestamp is None
with self._cvar:
assert not self._is_final
occurrence.timestamp = t
occurrence.index = next(self.index_iter)
if self._last is None:
self.beginning = occurrence
self._last = occurrence
else:
occurrence.previous = self._last
self._last._next = occurrence
self._last = occurrence
self._cvar.notify_all()
@contextlib.contextmanager
def frozen(self):
with self._cvar:
was_frozen = self._is_frozen
self._is_frozen = True
yield
self._is_frozen = was_frozen
@property
def is_frozen(self):
return self._is_frozen
def finalize(self):
with self._cvar:
self._is_final = True
self._is_frozen = True
@property
def is_finalized(self):
return self._is_finalized
def __repr__(self):
return '|' + ' >> '.join(repr(occ) for occ in self.history()) + '|'
with self.frozen():
return '|' + ' >> '.join(repr(occ) for occ in self.history()) + '|'
def __str__(self):
return '\n'.join(repr(occ) for occ in self.history())
with self.frozen():
return '\n'.join(repr(occ) for occ in self.history())
def __data__(self):
return self.history()
with self.frozen():
return self.history()
def mark(self, id):
occ = Occurrence(self, 'Mark', id)
@ -76,11 +140,7 @@ class Timeline(object):
occ = Occurrence(self, 'Request', command, arguments)
occ.command = command
occ.arguments = arguments
def wait_for_response():
self.wait_until(Response(occ, pattern.ANY))
occ.wait_for_response = wait_for_response
occ.wait_for_response = lambda: Response(occ, pattern.ANY).wait()
return occ
def record_response(self, request, body):
@ -97,76 +157,79 @@ class Timeline(object):
occ.body = body
return occ
class Interval(tuple):
def __init__(self, timeline, start, stop):
assert isinstance(start, Occurrence)
assert isinstance(stop, Occurrence)
class Occurrence(object):
def __init__(self, timeline, *circumstances):
assert circumstances
self.timeline = timeline
self.preceding = None
self.timestamp = None
self.circumstances = circumstances
timeline._record(self)
assert self.timestamp is not None
if start is stop:
occs = ()
else:
assert start < stop
occs = tuple(self.start.and_following(until=self.stop))
super(Timeline.Interval, self).__init__(occs)
def backtrack(self, until=None):
assert until is None or isinstance(until, Occurrence)
occ = self
while occ is not until:
yield occ
occ = occ.preceding
self.timeline = timeline
self.start = start
self.stop = stop
def precedes(self, occurrence):
assert isinstance(occurrence, Occurrence)
preceding = occurrence.backtrack()
next(preceding)
return any(occ is self for occ in preceding)
def follows(self, occurrence):
assert isinstance(occurrence, Occurrence)
return occurrence.precedes(self)
def __lt__(self, occurrence):
return self.precedes(occurrence)
def __gt__(self, occurrence):
return self.follows(occurrence)
def __le__(self, occurrence):
return self == occurrence or self < occurrence
def __ge__(self, occurrence):
return self == occurrence or self > occurrence
def __rshift__(self, expectation):
assert isinstance(expectation, Expectation)
return expectation.after(self)
def __hash__(self):
return hash(id(self))
def __data__(self):
return self.circumstances
def __repr__(self):
timestamp = int(self.timestamp * 100000)
return '@%06d:%s%r' % (timestamp, self.circumstances[0], self.circumstances[1:])
def __contains__(self, expectation):
occs = [occ for occ in self if expectation.is_realized_by(occ)]
occs.reverse()
return tuple(occs)
class Expectation(object):
def has_occurred_by(self, occurrence):
timeline = None
has_lower_bound = False
def is_realized_by(self, occurrence):
raise NotImplementedError()
def after(self, other):
return BoundedExpectation(self, must_follow=other)
def is_realized_by_any_of(self, occurrences):
return any(self.is_realized_by(occ) for occ in occurrences)
def before(self, other):
return BoundedExpectation(self, must_precede=other)
def has_been_realized_before(self, occurrence):
return self.is_realized_by_any_of(occurrence.preceding())
def has_been_realized_after(self, occurrence):
return self.is_realized_by_any_of(occurrence.following())
def has_been_realized_at_or_before(self, occurrence):
return self.is_realized_by_any_of(occurrence.and_preceding())
def has_been_realized_at_or_after(self, occurrence):
return self.is_realized_by_any_of(occurrence.and_following())
def has_been_realized_in(self, timeline):
return timeline.all_occurrences_of(self) != ()
def wait(self):
assert self.timeline and self.has_lower_bound, (
'Expectation must belong to a timeline and have a lower time bound to be used wait()! '
'Use >> to sequence an expectation against an occurrence to establish a lower bound.'
)
return self.timeline.wait_for(self)
def __eq__(self, other):
if self is other:
return True
elif isinstance(other, Occurrence) and self.is_realized_by(other):
return True
else:
return NotImplemented
def __ne__(self, other):
return not self == other
def after(self, other):
return SequencedExpectation(self, only_after=other)
def when(self, condition):
return ConditionalExpectation(self, condition)
def __rshift__(self, other):
return self.before(other)
return self if other is None else other.after(self)
def __and__(self, other):
assert isinstance(other, Expectation)
@ -180,80 +243,86 @@ class Expectation(object):
assert isinstance(other, Expectation)
return XorExpectation(self, other)
def __invert__(self):
return NotExpectation(self)
def __repr__(self):
raise NotImplementedError()
class BoundedExpectation(Expectation):
def __init__(self, expectation, must_follow=None, must_precede=None):
self.expectation = expectation
self.must_follow = Occurred(must_follow) if isinstance(must_follow, Occurrence) else must_follow
self.must_precede = Occurred(must_precede) if isinstance(must_precede, Occurrence) else must_precede
assert isinstance(self.expectation, Expectation)
assert self.must_follow is None or isinstance(self.must_follow, Expectation)
assert self.must_precede is None or isinstance(self.must_precede, Expectation)
def has_occurred_by(self, occurrence):
assert isinstance(occurrence, Occurrence)
expectation = self.expectation
must_follow = self.must_follow
must_precede = self.must_precede
occ = occurrence
if must_precede is not None:
for occ in occ.backtrack():
if not must_precede.has_occurred_by(occ):
break
else:
return False
for occ in occ.backtrack():
if expectation.has_occurred_by(occ):
break
else:
return False
return must_follow is None or must_follow.has_occurred_by(occ)
def __repr__(self):
s = '('
if self.must_follow is not None:
s += repr(self.must_follow) + ' >> '
s += repr(self.expectation)
if self.must_precede is not None:
s += ' >> ' + repr(self.must_precede)
s += ')'
return s
class AndExpectation(Expectation):
class DerivativeExpectation(Expectation):
def __init__(self, *expectations):
self.expectations = expectations
assert len(expectations) > 0
assert all(isinstance(exp, Expectation) for exp in expectations)
self.expectations = expectations
def has_occurred_by(self, occurrence):
timelines = {id(exp.timeline): exp.timeline for exp in expectations}
timelines.pop(id(None), None)
if len(timelines) > 1:
print('Cannot mix expectations from multiple timelines:')
for tl_id, tl in timelines.items():
print('\n %d: %r' % (tl_id, tl))
print()
raise ValueError('Cannot mix expectations from multiple timelines')
for tl in timelines.values():
self.timeline = tl
@property
def has_lower_bound(self):
return all(exp.has_lower_bound for exp in self.expectations)
class SequencedExpectation(DerivativeExpectation):
def __init__(self, expectation, only_after):
super(SequencedExpectation, self).__init__(expectation, only_after)
@property
def expectation(self):
return self.expectations[0]
@property
def only_after(self):
return self.expectations[1]
def is_realized_by(self, occurrence):
assert isinstance(occurrence, Occurrence)
return all(exp.has_occurred_by(occurrence) for exp in self.expectations)
return (
occurrence.realizes(self.expectation) and
self.only_after.has_been_realized_before(occurrence)
)
def __and__(self, other):
assert isinstance(other, Expectation)
return AndExpectation(*(self.expectations + (other,)))
@property
def has_lower_bound(self):
return self.expectation.has_lower_bound or self.only_after.has_lower_bound
def __repr__(self):
return '(' + ' & '.join(repr(exp) for exp in self.expectations) + ')'
return '(%r >> %r)' % (self.only_after, self.expectation)
class OrExpectation(Expectation):
def __init__(self, *expectations):
assert len(expectations) > 0
assert all(isinstance(exp, Expectation) for exp in expectations)
self.expectations = expectations
class NotExpectation(DerivativeExpectation):
def __init__(self, expectation):
super(NotExpectation, self).__init__(expectation)
def has_occurred_by(self, occurrence):
@property
def expectation(self):
return self.expectations[0]
def is_realized_by(self, occurrence):
assert isinstance(occurrence, Occurrence)
return any(exp.has_occurred_by(occurrence) for exp in self.expectations)
return not occurrence.realizes(self.expectation)
@property
def has_lower_bound(self):
return self.expectation.has_lower_bound
def __repr__(self):
return '~%r' % self.expectation
class OrExpectation(DerivativeExpectation):
def is_realized_by(self, occurrence):
assert isinstance(occurrence, Occurrence)
return any(occurrence.realizes(exp) for exp in self.expectations)
def __or__(self, other):
assert isinstance(other, Expectation)
@ -263,15 +332,55 @@ class OrExpectation(Expectation):
return '(' + ' | '.join(repr(exp) for exp in self.expectations) + ')'
class XorExpectation(Expectation):
def __init__(self, *expectations):
assert len(expectations) > 0
assert all(isinstance(exp, Expectation) for exp in expectations)
self.expectations = expectations
def has_occurred_by(self, occurrence):
class AndExpectation(DerivativeExpectation):
def is_realized_by(self, occurrence):
assert isinstance(occurrence, Occurrence)
return sum(exp.has_occurred_by(occurrence) for exp in self.expectations) == 1
# At least one expectation must be realized by the occurrence.
expectations = list(self.expectations)
for exp in expectations:
if occurrence.realizes(exp):
break
else:
return False
# And then all of the remaining expectations must have been realized
# at or sometime before that occurrence.
expectations.remove(exp)
return all(exp.has_been_realized_at_or_before(occurrence) for exp in expectations)
@property
def has_lower_bound(self):
return any(exp.has_lower_bound for exp in self.expectations)
def __and__(self, other):
assert isinstance(other, Expectation)
return AndExpectation(*(self.expectations + (other,)))
def __repr__(self):
return '(' + ' & '.join(repr(exp) for exp in self.expectations) + ')'
class XorExpectation(DerivativeExpectation):
def is_realized_by(self, occurrence):
assert isinstance(occurrence, Occurrence)
# At least one expectation must be realized by the occurrence.
expectations = list(self.expectations)
for exp in expectations:
if occurrence.realizes(exp):
break
else:
return False
# And then none of the remaining expectations must have been realized
# at or sometime before that occurrence.
expectations.remove(exp)
return not any(exp.has_been_realized_at_or_before(occurrence) for exp in expectations)
@property
def has_lower_bound(self):
return all(exp.has_lower_bound for exp in self.expectations)
def __xor__(self, other):
assert isinstance(other, Expectation)
@ -281,62 +390,153 @@ class XorExpectation(Expectation):
return '(' + ' ^ '.join(repr(exp) for exp in self.expectations) + ')'
class ConditionalExpectation(Expectation):
class ConditionalExpectation(DerivativeExpectation):
def __init__(self, expectation, condition):
assert isinstance(expectation, Expectation)
self.expectation = expectation
self.condition = condition
super(ConditionalExpectation, self).__init__(expectation)
def has_occurred_by(self, occurrence):
@property
def expectation(self):
return self.expectations[0]
def is_realized_by(self, occurrence):
assert isinstance(occurrence, Occurrence)
return self.condition(occurrence) and self.expectation.has_occurred_by(occurrence)
return self.condition(occurrence) and occurrence.realizes(self.expectation)
def __repr__(self):
return '%r?' % self.expectation
class BasicExpectation(Expectation):
class PatternExpectation(Expectation):
def __init__(self, *circumstances):
self.circumstances = pattern.Pattern(circumstances)
def has_occurred_by(self, occurrence):
def is_realized_by(self, occurrence):
assert isinstance(occurrence, Occurrence)
return any(
occ.circumstances
in self.circumstances
for occ in occurrence.backtrack()
)
return occurrence.circumstances == self.circumstances
def __repr__(self):
circumstances = self.circumstances.pattern
return '%s%r' % (circumstances[0], circumstances[1:])
class Occurred(BasicExpectation):
def __init__(self, occurrence):
assert isinstance(occurrence, Occurrence)
self.occurrence = occurrence
def has_occurred_by(self, occurrence):
assert isinstance(occurrence, Occurrence)
return any(occ is self.occurrence for occ in occurrence.backtrack())
def __repr__(self):
return 'Occurred(%r)' % self.occurrence
def Mark(id):
return BasicExpectation('Mark', id)
return PatternExpectation('Mark', id)
def Request(command, arguments=pattern.ANY):
return BasicExpectation('Request', command, arguments)
return PatternExpectation('Request', command, arguments)
def Response(request, body=pattern.ANY):
assert isinstance(request, Expectation) or isinstance(request, Occurrence)
return BasicExpectation('Response', request, body)
exp = PatternExpectation('Response', request, body)
exp.timeline = request.timeline
exp.has_lower_bound = request.has_lower_bound
return exp
def Event(event, body=pattern.ANY):
return BasicExpectation('Event', event, body)
return PatternExpectation('Event', event, body)
class Occurrence(Expectation):
has_lower_bound = True
def __init__(self, timeline, *circumstances):
assert circumstances
self.timeline = timeline
self.previous = None
self._next = None
self.timestamp = None
self.index = None
self.circumstances = circumstances
timeline._record(self)
assert self.timestamp is not None
@property
def next(self):
with self.timeline.frozen():
occ = self._next
was_last = occ is self.timeline.last
if was_last:
# The .next property of the last occurrence in a timeline can change
# at any moment when timeline isn't frozen. So if it wasn't frozen by
# the caller, this was an unsafe operation, and we should complain.
self.timeline.assert_frozen()
return occ
def preceding(self):
it = self.and_preceding()
next(it)
return it
def and_preceding(self, up_to=None):
assert up_to is None or isinstance(up_to, Expectation)
if isinstance(up_to, Occurrence):
assert self > up_to
occ = self
while occ != up_to:
yield occ
occ = occ.previous
def following(self):
self.timeline.assert_frozen()
it = self.and_following()
next(it)
return it
def and_following(self, up_to=None):
assert up_to is None or isinstance(up_to, Expectation)
self.timeline.assert_frozen()
if isinstance(up_to, Occurrence):
assert self < up_to
occ = self
while occ != up_to:
yield occ
occ = occ.next
def precedes(self, occurrence):
assert isinstance(occurrence, Occurrence)
return any(occ is self for occ in occurrence.preceding())
def follows(self, occurrence):
assert isinstance(occurrence, Occurrence)
return any(occ is self for occ in occurrence.following())
def realizes(self, expectation):
assert isinstance(expectation, Expectation)
return expectation.is_realized_by(self)
def await_following(self, expectation):
assert isinstance(expectation, Expectation)
expectation = self >> expectation
return self.timeline.wait_for(expectation)
def is_realized_by(self, other):
return self is other
def __lt__(self, occurrence):
return self.precedes(occurrence)
def __gt__(self, occurrence):
return self.follows(occurrence)
def __le__(self, occurrence):
return self is occurrence or self < occurrence
def __ge__(self, occurrence):
return self is occurrence or self > occurrence
def __rshift__(self, expectation):
assert isinstance(expectation, Expectation)
return expectation.after(self)
def __hash__(self):
return hash(id(self))
def __data__(self):
return self.circumstances
def __repr__(self):
return '%d!%s%r' % (self.index, self.circumstances[0], self.circumstances[1:])