mirror of
https://github.com/microsoft/debugpy.git
synced 2025-12-23 08:48:12 +00:00
Add a command for checking the vendored schema file.
This commit is contained in:
parent
b8cf3d3e9e
commit
8985aeead0
14 changed files with 760 additions and 235 deletions
4
Makefile
4
Makefile
|
|
@ -20,3 +20,7 @@ test: ## Run the test suite.
|
|||
.PHONY: coverage
|
||||
coverage: ## Check line coverage.
|
||||
$(PYTHON) -m coverage run -m tests
|
||||
|
||||
.PHONY: check-schemafile
|
||||
check-schemafile: ## Validate the vendored schema file.
|
||||
python3 -m debugger_protocol.schema check
|
||||
|
|
|
|||
|
|
@ -2,7 +2,3 @@ import os.path
|
|||
|
||||
|
||||
DATA_DIR = os.path.dirname(__file__)
|
||||
|
||||
UPSTREAM = 'https://github.com/Microsoft/vscode-debugadapter-node/raw/master/debugProtocol.json' # noqa
|
||||
VENDORED = os.path.join(DATA_DIR, 'debugProtocol.json')
|
||||
METADATA = os.path.join(DATA_DIR, 'UPSTREAM')
|
||||
|
|
|
|||
|
|
@ -1,10 +1,10 @@
|
|||
import argparse
|
||||
import os.path
|
||||
import sys
|
||||
|
||||
from . import (UPSTREAM, VENDORED, METADATA,
|
||||
upstream)
|
||||
from ._util import open_url
|
||||
from .metadata import open_metadata
|
||||
from .upstream import URL as UPSTREAM, download
|
||||
from .vendored import FILENAME as VENDORED, check_local, check_upstream
|
||||
|
||||
|
||||
COMMANDS = {}
|
||||
|
|
@ -18,20 +18,33 @@ def as_command(name):
|
|||
|
||||
|
||||
@as_command('download')
|
||||
def handle_download(source=UPSTREAM, target=VENDORED):
|
||||
def handle_download(source=UPSTREAM, target=VENDORED, *,
|
||||
_open=open, _open_url=open_url):
|
||||
# Download the schema file.
|
||||
with open_url(source) as infile:
|
||||
with open(target, 'wb') as outfile:
|
||||
meta = upstream.download(source, infile, outfile)
|
||||
with _open_url(source) as infile:
|
||||
with _open(target, 'wb') as outfile:
|
||||
meta = download(source, infile, outfile,
|
||||
_open=_open)
|
||||
|
||||
# Save the metadata.
|
||||
filename = os.path.join(os.path.dirname(target),
|
||||
os.path.basename(METADATA))
|
||||
with open(filename, 'w') as metafile:
|
||||
metafile, _ = open_metadata(target, 'w',
|
||||
_open=_open)
|
||||
with metafile:
|
||||
metafile.write(
|
||||
meta.format())
|
||||
|
||||
|
||||
@as_command('check')
|
||||
def handle_check(schemafile=VENDORED, *, _open=open, _open_url=open_url):
|
||||
print('checking local schema file...')
|
||||
check_local(schemafile,
|
||||
_open=_open)
|
||||
print('comparing with upstream schema file...')
|
||||
check_upstream(schemafile,
|
||||
_open=_open, _open_url=_open_url)
|
||||
print('schema file okay')
|
||||
|
||||
|
||||
#############################
|
||||
# the script
|
||||
|
||||
|
|
@ -58,6 +71,9 @@ def parse_args(argv=sys.argv[1:], prog=None):
|
|||
download.add_argument('--source', default=UPSTREAM)
|
||||
download.add_argument('--target', default=VENDORED)
|
||||
|
||||
check = subs.add_parser('check')
|
||||
check.add_argument('--schemafile', default=VENDORED)
|
||||
|
||||
args = parser.parse_args(argv)
|
||||
if args.command is None:
|
||||
parser.print_help()
|
||||
|
|
|
|||
|
|
@ -12,7 +12,7 @@ def open_url(url):
|
|||
def get_revision(url, *, _open=open_url):
|
||||
"""Return the revision corresponding to the given URL."""
|
||||
if url.startswith('https://github.com/'):
|
||||
return get_github_revision(url, _open=_open)
|
||||
return github_get_revision(url, _open=_open)
|
||||
else:
|
||||
raise NotImplementedError
|
||||
|
||||
|
|
@ -35,7 +35,7 @@ GH_RESOURCE_RE = re.compile(r'^https://github.com'
|
|||
r'/(?P<path>.*)$')
|
||||
|
||||
|
||||
def get_github_revision(url, *, _open=open_url):
|
||||
def github_get_revision(url, *, _open=open_url):
|
||||
"""Return the full commit hash corresponding to the given URL."""
|
||||
m = GH_RESOURCE_RE.match(url)
|
||||
if not m:
|
||||
|
|
@ -48,3 +48,13 @@ def get_github_revision(url, *, _open=open_url):
|
|||
raw = revinfo.read()
|
||||
data = json.loads(raw.decode())
|
||||
return data['sha']
|
||||
|
||||
|
||||
def github_url_replace_ref(url, newref):
|
||||
"""Return a new URL with the ref replaced."""
|
||||
m = GH_RESOURCE_RE.match(url)
|
||||
if not m:
|
||||
raise ValueError('invalid GitHub resource URL: {!r}'.format(url))
|
||||
org, repo, kind, _, path = m.groups()
|
||||
parts = (org, repo, kind, newref, path)
|
||||
return 'https://github.com/' + '/'.join(parts)
|
||||
|
|
|
|||
15
debugger_protocol/schema/file.py
Normal file
15
debugger_protocol/schema/file.py
Normal file
|
|
@ -0,0 +1,15 @@
|
|||
|
||||
|
||||
class SchemaFileError(Exception):
|
||||
"""A schema-file-related operation failed."""
|
||||
|
||||
|
||||
def read_schema(filename, *, _open=open):
|
||||
"""Return the data (bytes) in the given schema file."""
|
||||
try:
|
||||
schemafile = _open(filename, 'rb')
|
||||
except FileNotFoundError as exc:
|
||||
raise SchemaFileError(
|
||||
'schema file {!r} not found'.format(filename))
|
||||
with schemafile:
|
||||
return schemafile.read()
|
||||
117
debugger_protocol/schema/metadata.py
Normal file
117
debugger_protocol/schema/metadata.py
Normal file
|
|
@ -0,0 +1,117 @@
|
|||
from collections import namedtuple
|
||||
from datetime import datetime
|
||||
import os.path
|
||||
from textwrap import dedent
|
||||
|
||||
from ._util import github_url_replace_ref
|
||||
|
||||
|
||||
class MetadataError(Exception):
|
||||
"""A metadata-related operation failed."""
|
||||
|
||||
|
||||
def open_metadata(schemafile, mode='r', *, _open=open):
|
||||
"""Return a file object for the metadata of the given schema file.
|
||||
|
||||
Also return the metadata file's filename.
|
||||
"""
|
||||
from .vendored import METADATA
|
||||
filename = os.path.join(os.path.dirname(schemafile),
|
||||
os.path.basename(METADATA))
|
||||
try:
|
||||
return _open(filename), filename
|
||||
except FileNotFoundError as exc:
|
||||
raise MetadataError(
|
||||
'metadata file for {!r} not found'.format(schemafile))
|
||||
|
||||
|
||||
def read_metadata(schemafile, *, _open=open):
|
||||
"""Return the metadata corresponding to the schema file.
|
||||
|
||||
Also return the path to the metadata file.
|
||||
"""
|
||||
metafile, filename = open_metadata(schemafile, _open=_open)
|
||||
with metafile:
|
||||
data = metafile.read()
|
||||
|
||||
try:
|
||||
meta = Metadata.parse(data)
|
||||
except Exception as exc:
|
||||
raise MetadataError(
|
||||
'metadata file {!r} not valid: {}'.format(filename, exc))
|
||||
|
||||
return meta, filename
|
||||
|
||||
|
||||
class Metadata(namedtuple('Metadata', 'upstream revision checksum date')):
|
||||
"""Info about the local copy of the upstream schema file."""
|
||||
|
||||
TIMESTAMP = '%Y-%m-%d %H:%M:%S (UTC)'
|
||||
|
||||
FORMAT = dedent("""\
|
||||
upstream: {}
|
||||
revision: {}
|
||||
checksum: {}
|
||||
date: {:%s}
|
||||
""") % TIMESTAMP
|
||||
|
||||
@classmethod
|
||||
def parse(cls, data):
|
||||
"""Return an instance based on the given metadata string."""
|
||||
lines = data.splitlines()
|
||||
|
||||
kwargs = {}
|
||||
for line in lines:
|
||||
line = line.strip()
|
||||
if line.startswith('#'):
|
||||
continue
|
||||
if not line:
|
||||
continue
|
||||
field, _, value = line.partition(':')
|
||||
kwargs[field] = value.strip()
|
||||
self = cls(**kwargs)
|
||||
return self
|
||||
|
||||
def __new__(cls, upstream, revision, checksum, date):
|
||||
# coercion
|
||||
upstream = str(upstream) if upstream else None
|
||||
revision = str(revision) if revision else None
|
||||
checksum = str(checksum) if checksum else None
|
||||
if not date:
|
||||
date = None
|
||||
elif isinstance(date, str):
|
||||
date = datetime.strptime(date, cls.TIMESTAMP)
|
||||
elif date.tzinfo is not None:
|
||||
date -= date.utcoffset()
|
||||
|
||||
self = super().__new__(cls, upstream, revision, checksum, date)
|
||||
return self
|
||||
|
||||
def __init__(self, *args, **kwargs):
|
||||
# validation
|
||||
|
||||
if not self.upstream:
|
||||
raise ValueError('missing upstream URL')
|
||||
# TODO ensure upstream is URL?
|
||||
|
||||
if not self.revision:
|
||||
raise ValueError('missing upstream revision')
|
||||
# TODO ensure revision is a hash?
|
||||
|
||||
if not self.checksum:
|
||||
raise ValueError('missing checksum')
|
||||
# TODO ensure checksum is a MD5 hash?
|
||||
|
||||
if not self.date:
|
||||
raise ValueError('missing date')
|
||||
|
||||
@property
|
||||
def url(self):
|
||||
if self.upstream.startswith('https://github.com/'):
|
||||
return github_url_replace_ref(self.upstream, self.revision)
|
||||
else:
|
||||
raise NotImplementedError
|
||||
|
||||
def format(self):
|
||||
"""Return a string containing the formatted metadata."""
|
||||
return self.FORMAT.format(*self)
|
||||
|
|
@ -1,9 +1,13 @@
|
|||
from collections import namedtuple
|
||||
from datetime import datetime
|
||||
from textwrap import dedent
|
||||
import io
|
||||
import urllib.error
|
||||
|
||||
from . import UPSTREAM
|
||||
from ._util import open_url, get_revision, get_checksum
|
||||
from .file import SchemaFileError
|
||||
from .metadata import Metadata
|
||||
|
||||
|
||||
URL = 'https://github.com/Microsoft/vscode-debugadapter-node/raw/master/debugProtocol.json' # noqa
|
||||
|
||||
|
||||
def download(source, infile, outfile, *, _now=datetime.utcnow, _open=open_url):
|
||||
|
|
@ -18,80 +22,14 @@ def download(source, infile, outfile, *, _now=datetime.utcnow, _open=open_url):
|
|||
return Metadata(source, revision, checksum, date)
|
||||
|
||||
|
||||
class Metadata(namedtuple('Metadata', 'upstream revision checksum date')):
|
||||
"""Info about the local copy of the upstream schema file."""
|
||||
|
||||
TIMESTAMP = '%Y-%m-%d %H:%M:%S (UTC)'
|
||||
|
||||
FORMAT = dedent("""\
|
||||
upstream: {}
|
||||
revision: {}
|
||||
checksum: {}
|
||||
date: {:%s}
|
||||
""") % TIMESTAMP
|
||||
|
||||
#@get_revision(upstream)
|
||||
#@download(upstream, revision=None)
|
||||
#validate_file(filename)
|
||||
#verify_remote()
|
||||
|
||||
@classmethod
|
||||
def parse(cls, data):
|
||||
"""Return an instance based on the given metadata string."""
|
||||
lines = data.splitlines()
|
||||
|
||||
kwargs = {}
|
||||
for line in lines:
|
||||
line = line.strip()
|
||||
if line.startswith('#'):
|
||||
continue
|
||||
if not line:
|
||||
continue
|
||||
field, _, value = line.partition(':')
|
||||
kwargs[field] = value.strip()
|
||||
self = cls(**kwargs)
|
||||
return self
|
||||
|
||||
def __new__(cls, upstream, revision, checksum, date):
|
||||
# coercion
|
||||
upstream = str(upstream) if upstream else None
|
||||
revision = str(revision) if revision else None
|
||||
checksum = str(checksum) if checksum else None
|
||||
if not date:
|
||||
date = None
|
||||
elif isinstance(date, str):
|
||||
date = datetime.strptime(date, cls.TIMESTAMP)
|
||||
elif date.tzinfo is not None:
|
||||
date -= date.utcoffset()
|
||||
|
||||
self = super().__new__(cls, upstream, revision, checksum, date)
|
||||
return self
|
||||
|
||||
def __init__(self, *args, **kwargs):
|
||||
# validation
|
||||
|
||||
if not self.upstream:
|
||||
raise ValueError('missing upstream URL')
|
||||
# TODO ensure upstream is URL?
|
||||
|
||||
if not self.revision:
|
||||
raise ValueError('missing upstream revision')
|
||||
# TODO ensure revision is a hash?
|
||||
|
||||
if not self.checksum:
|
||||
raise ValueError('missing checksum')
|
||||
# TODO ensure checksum is a MD5 hash?
|
||||
|
||||
if not self.date:
|
||||
raise ValueError('missing date')
|
||||
|
||||
@property
|
||||
def url(self):
|
||||
if self.upstream == UPSTREAM:
|
||||
return self.upstream.replace('master', self.revision)
|
||||
else:
|
||||
raise NotImplementedError
|
||||
|
||||
def format(self):
|
||||
"""Return a string containing the formatted metadata."""
|
||||
return self.FORMAT.format(*self)
|
||||
def read(url, *, _open_url=open_url):
|
||||
"""Return (data, metadata) for the given upstream URL."""
|
||||
outfile = io.BytesIO()
|
||||
try:
|
||||
infile = _open_url(url)
|
||||
except (FileNotFoundError, urllib.error.HTTPError) as exc:
|
||||
# XXX Ensure it's a 404 error?
|
||||
raise SchemaFileError('schema file at {!r} not found'.format(url))
|
||||
with infile:
|
||||
upstream = download(url, infile, outfile, _open=_open_url)
|
||||
return outfile.getvalue(), upstream
|
||||
|
|
|
|||
67
debugger_protocol/schema/vendored.py
Normal file
67
debugger_protocol/schema/vendored.py
Normal file
|
|
@ -0,0 +1,67 @@
|
|||
import os.path
|
||||
|
||||
from . import DATA_DIR, upstream
|
||||
from ._util import open_url, get_checksum
|
||||
from .file import SchemaFileError, read_schema
|
||||
from .metadata import MetadataError, read_metadata
|
||||
|
||||
|
||||
FILENAME = os.path.join(DATA_DIR, 'debugProtocol.json')
|
||||
METADATA = os.path.join(DATA_DIR, 'UPSTREAM')
|
||||
|
||||
|
||||
class SchemaFileMismatchError(SchemaFileError, MetadataError):
|
||||
"""The schema file does not match expectations."""
|
||||
|
||||
@classmethod
|
||||
def _build_message(cls, filename, actual, expected, upstream):
|
||||
if upstream:
|
||||
msg = ('local schema file {!r} does not match upstream {!r}'
|
||||
).format(filename, expected.upstream)
|
||||
else:
|
||||
msg = ('schema file {!r} does not match metadata file'
|
||||
).format(filename)
|
||||
|
||||
for field in actual._fields:
|
||||
value = getattr(actual, field)
|
||||
other = getattr(expected, field)
|
||||
if value != other:
|
||||
msg += (' ({} mismatch: {!r} != {!r})'
|
||||
).format(field, value, other)
|
||||
break
|
||||
|
||||
return msg
|
||||
|
||||
def __init__(self, filename, actual, expected, *, upstream=False):
|
||||
super().__init__(
|
||||
self._build_message(filename, actual, expected, upstream))
|
||||
self.filename = filename
|
||||
self.actual = actual
|
||||
self.expected = expected
|
||||
self.upstream = upstream
|
||||
|
||||
|
||||
def check_local(filename, *, _open=open):
|
||||
"""Ensure that the local schema file matches the local metadata file."""
|
||||
# Get the vendored metadata and data.
|
||||
meta, _ = read_metadata(filename, _open=_open)
|
||||
data = read_schema(filename, _open=_open)
|
||||
|
||||
# Only worry about the checksum matching.
|
||||
actual = meta._replace(
|
||||
checksum=get_checksum(data))
|
||||
if actual != meta:
|
||||
raise SchemaFileMismatchError(filename, actual, meta)
|
||||
|
||||
|
||||
def check_upstream(filename, *, _open=open, _open_url=open_url):
|
||||
"""Ensure that the local metadata file matches the upstream schema file."""
|
||||
# Get the vendored and upstream metadata.
|
||||
meta, _ = read_metadata(filename, _open=_open)
|
||||
_, upmeta = upstream.read(meta.upstream, _open_url=_open_url)
|
||||
|
||||
# Make sure the revision and checksum match.
|
||||
if meta.revision != upmeta.revision:
|
||||
raise SchemaFileMismatchError(filename, meta, upmeta, upstream=True)
|
||||
if meta.checksum != upmeta.checksum:
|
||||
raise SchemaFileMismatchError(filename, meta, upmeta, upstream=True)
|
||||
15
tests/debugger_protocol/schema/helpers.py
Normal file
15
tests/debugger_protocol/schema/helpers.py
Normal file
|
|
@ -0,0 +1,15 @@
|
|||
|
||||
|
||||
class StubOpener:
|
||||
|
||||
def __init__(self, *files):
|
||||
self.files = list(files)
|
||||
self.calls = []
|
||||
|
||||
def open(self, *args):
|
||||
self.calls.append(args)
|
||||
|
||||
file = self.files.pop(0)
|
||||
if file is None:
|
||||
raise FileNotFoundError
|
||||
return file
|
||||
92
tests/debugger_protocol/schema/test___main__.py
Normal file
92
tests/debugger_protocol/schema/test___main__.py
Normal file
|
|
@ -0,0 +1,92 @@
|
|||
import contextlib
|
||||
import io
|
||||
from textwrap import dedent
|
||||
import unittest
|
||||
|
||||
from .helpers import StubOpener
|
||||
from debugger_protocol.schema.__main__ import (
|
||||
COMMANDS, handle_download, handle_check)
|
||||
|
||||
|
||||
class Outfile:
|
||||
|
||||
def __init__(self, initial):
|
||||
self.written = initial
|
||||
|
||||
def write(self, data):
|
||||
self.written += data
|
||||
return len(data)
|
||||
|
||||
def __enter__(self):
|
||||
return self
|
||||
|
||||
def __exit__(self, *args):
|
||||
pass
|
||||
|
||||
|
||||
class CommandRegistryTests(unittest.TestCase):
|
||||
|
||||
def test_commands(self):
|
||||
self.assertEqual(set(COMMANDS), {
|
||||
'download',
|
||||
'check',
|
||||
})
|
||||
|
||||
|
||||
class HandleDownloadTests(unittest.TestCase):
|
||||
|
||||
def test_default_args(self):
|
||||
schemafile = io.BytesIO(b'<a schema>')
|
||||
outfile = Outfile(b'')
|
||||
buf = io.BytesIO(
|
||||
b'{"sha": "fc2395ca3564fb2afded8d90ddbe38dad1bf86f1"}')
|
||||
metafile = Outfile('')
|
||||
opener = StubOpener(schemafile, outfile, buf, metafile)
|
||||
|
||||
stdout = io.StringIO()
|
||||
with contextlib.redirect_stdout(stdout):
|
||||
with contextlib.redirect_stderr(stdout):
|
||||
handle_download(
|
||||
_open=opener.open, _open_url=opener.open)
|
||||
metadata = '\n'.join(line
|
||||
for line in metafile.written.splitlines()
|
||||
if not line.startswith('date: '))
|
||||
|
||||
self.assertEqual(outfile.written, b'<a schema>')
|
||||
self.assertEqual(metadata, dedent("""
|
||||
upstream: https://github.com/Microsoft/vscode-debugadapter-node/raw/master/debugProtocol.json
|
||||
revision: fc2395ca3564fb2afded8d90ddbe38dad1bf86f1
|
||||
checksum: e778c3751f9d0bceaf8d5aa81e2c659f
|
||||
""").strip()) # noqa
|
||||
self.assertEqual(stdout.getvalue(), '')
|
||||
|
||||
|
||||
class HandleCheckTests(unittest.TestCase):
|
||||
|
||||
def test_default_args(self):
|
||||
metadata = dedent("""
|
||||
upstream: https://github.com/x/y/raw/master/z
|
||||
revision: fc2395ca3564fb2afded8d90ddbe38dad1bf86f1
|
||||
checksum: e778c3751f9d0bceaf8d5aa81e2c659f
|
||||
date: 2018-01-09 13:10:59 (UTC)
|
||||
""")
|
||||
opener = StubOpener(
|
||||
io.StringIO(metadata),
|
||||
io.BytesIO(b'<a schema>'), # local
|
||||
io.StringIO(metadata),
|
||||
io.BytesIO(b'<a schema>'), # upstream
|
||||
io.BytesIO(
|
||||
b'{"sha": "fc2395ca3564fb2afded8d90ddbe38dad1bf86f1"}'),
|
||||
)
|
||||
|
||||
stdout = io.StringIO()
|
||||
with contextlib.redirect_stdout(stdout):
|
||||
with contextlib.redirect_stderr(stdout):
|
||||
handle_check(
|
||||
_open=opener.open, _open_url=opener.open)
|
||||
|
||||
self.assertEqual(stdout.getvalue(), dedent("""\
|
||||
checking local schema file...
|
||||
comparing with upstream schema file...
|
||||
schema file okay
|
||||
"""))
|
||||
22
tests/debugger_protocol/schema/test_file.py
Normal file
22
tests/debugger_protocol/schema/test_file.py
Normal file
|
|
@ -0,0 +1,22 @@
|
|||
import io
|
||||
import unittest
|
||||
|
||||
from .helpers import StubOpener
|
||||
from debugger_protocol.schema.file import SchemaFileError, read_schema
|
||||
|
||||
|
||||
class ReadSchemaTests(unittest.TestCase):
|
||||
|
||||
def test_success(self):
|
||||
schemafile = io.BytesIO(b'<a schema>')
|
||||
opener = StubOpener(schemafile)
|
||||
|
||||
data = read_schema('schema.json', _open=opener.open)
|
||||
|
||||
self.assertEqual(data, b'<a schema>')
|
||||
|
||||
def test_file_missing(self):
|
||||
opener = StubOpener(None)
|
||||
|
||||
with self.assertRaises(SchemaFileError):
|
||||
read_schema('schema.json', _open=opener.open)
|
||||
210
tests/debugger_protocol/schema/test_metadata.py
Normal file
210
tests/debugger_protocol/schema/test_metadata.py
Normal file
|
|
@ -0,0 +1,210 @@
|
|||
from datetime import datetime
|
||||
import io
|
||||
import os.path
|
||||
from textwrap import dedent
|
||||
import unittest
|
||||
|
||||
from .helpers import StubOpener
|
||||
from debugger_protocol.schema.upstream import URL as UPSTREAM
|
||||
from debugger_protocol.schema.metadata import (
|
||||
open_metadata, read_metadata,
|
||||
MetadataError, Metadata)
|
||||
|
||||
|
||||
class Stringlike:
|
||||
|
||||
def __init__(self, value):
|
||||
self.value = value
|
||||
|
||||
def __str__(self):
|
||||
return self.value
|
||||
|
||||
|
||||
class Hash(Stringlike):
|
||||
pass
|
||||
|
||||
|
||||
class OpenMetadataTests(unittest.TestCase):
|
||||
|
||||
def test_success(self):
|
||||
expected = object()
|
||||
opener = StubOpener(expected)
|
||||
schemadir = os.path.join('x', 'y', 'z', '')
|
||||
metafile, filename = open_metadata(schemadir + 'schema.json',
|
||||
_open=opener.open)
|
||||
|
||||
self.assertIs(metafile, expected)
|
||||
self.assertEqual(filename, schemadir + 'UPSTREAM')
|
||||
|
||||
def test_file_missing(self):
|
||||
metafile = None
|
||||
opener = StubOpener(metafile)
|
||||
|
||||
with self.assertRaises(MetadataError):
|
||||
open_metadata('schema.json', _open=opener.open)
|
||||
|
||||
|
||||
class ReadMetadataTests(unittest.TestCase):
|
||||
|
||||
def test_success(self):
|
||||
metafile = io.StringIO(dedent("""
|
||||
upstream: https://x.y.z/schema.json
|
||||
revision: abcdef0123456789
|
||||
checksum: deadbeefdeadbeefdeadbeefdeadbeef
|
||||
date: 2018-01-09 13:10:59 (UTC)
|
||||
"""))
|
||||
opener = StubOpener(metafile)
|
||||
schemadir = os.path.join('x', 'y', 'z', '')
|
||||
meta, filename = read_metadata(schemadir + 'schema.json',
|
||||
_open=opener.open)
|
||||
|
||||
self.assertEqual(meta,
|
||||
Metadata('https://x.y.z/schema.json',
|
||||
'abcdef0123456789',
|
||||
'deadbeefdeadbeefdeadbeefdeadbeef',
|
||||
datetime(2018, 1, 9, 13, 10, 59),
|
||||
))
|
||||
self.assertEqual(filename, schemadir + 'UPSTREAM')
|
||||
|
||||
def test_file_missing(self):
|
||||
metafile = None
|
||||
opener = StubOpener(metafile)
|
||||
|
||||
with self.assertRaises(MetadataError):
|
||||
read_metadata('schema.json', _open=opener.open)
|
||||
|
||||
def test_file_invalid(self):
|
||||
metafile = io.StringIO('<bogus>')
|
||||
opener = StubOpener(metafile)
|
||||
|
||||
with self.assertRaises(MetadataError):
|
||||
read_metadata('schema.json', _open=opener.open)
|
||||
|
||||
|
||||
class MetadataTests(unittest.TestCase):
|
||||
|
||||
def test_parse_minimal(self):
|
||||
expected = Metadata('https://x.y.z/schema.json',
|
||||
'abcdef0123456789',
|
||||
'deadbeefdeadbeefdeadbeefdeadbeef',
|
||||
datetime(2018, 1, 9, 13, 10, 59),
|
||||
)
|
||||
meta = Metadata.parse(dedent("""
|
||||
upstream: https://x.y.z/schema.json
|
||||
revision: abcdef0123456789
|
||||
checksum: deadbeefdeadbeefdeadbeefdeadbeef
|
||||
date: 2018-01-09 13:10:59 (UTC)
|
||||
"""))
|
||||
|
||||
self.assertEqual(meta, expected)
|
||||
|
||||
def test_parse_with_whitespace_and_comments(self):
|
||||
expected = Metadata('https://x.y.z/schema.json',
|
||||
'abcdef0123456789',
|
||||
'deadbeefdeadbeefdeadbeefdeadbeef',
|
||||
datetime(2018, 1, 9, 13, 10, 59),
|
||||
)
|
||||
meta = Metadata.parse(dedent("""
|
||||
|
||||
# generated by x.y.z
|
||||
upstream: https://x.y.z/schema.json
|
||||
|
||||
revision: abcdef0123456789
|
||||
checksum: deadbeefdeadbeefdeadbeefdeadbeef
|
||||
date: 2018-01-09 13:10:59 (UTC)
|
||||
|
||||
# done!
|
||||
|
||||
""")) # noqa
|
||||
|
||||
self.assertEqual(meta, expected)
|
||||
|
||||
def test_parse_roundtrip_from_object(self):
|
||||
orig = Metadata('https://x.y.z/schema.json',
|
||||
'abcdef0123456789',
|
||||
'deadbeefdeadbeefdeadbeefdeadbeef',
|
||||
datetime(2018, 1, 9, 13, 10, 59),
|
||||
)
|
||||
meta = Metadata.parse(
|
||||
orig.format())
|
||||
|
||||
self.assertEqual(meta, orig)
|
||||
|
||||
def test_parse_roundtrip_from_string(self):
|
||||
orig = dedent("""\
|
||||
upstream: https://x.y.z/schema.json
|
||||
revision: abcdef0123456789
|
||||
checksum: deadbeefdeadbeefdeadbeefdeadbeef
|
||||
date: 2018-01-09 13:10:59 (UTC)
|
||||
""")
|
||||
data = (Metadata.parse(orig)
|
||||
).format()
|
||||
|
||||
self.assertEqual(data, orig)
|
||||
|
||||
def test_coercion_noop(self):
|
||||
meta = Metadata('https://x.y.z/schema.json',
|
||||
'abcdef0123456789',
|
||||
'deadbeefdeadbeefdeadbeefdeadbeef',
|
||||
datetime(2018, 1, 9, 13, 10, 59),
|
||||
)
|
||||
|
||||
self.assertEqual(meta, (
|
||||
'https://x.y.z/schema.json',
|
||||
'abcdef0123456789',
|
||||
'deadbeefdeadbeefdeadbeefdeadbeef',
|
||||
datetime(2018, 1, 9, 13, 10, 59),
|
||||
))
|
||||
|
||||
def test_coercion_change_all(self):
|
||||
meta = Metadata(Stringlike('https://x.y.z/schema.json'),
|
||||
Hash('abcdef0123456789'),
|
||||
Hash('deadbeefdeadbeefdeadbeefdeadbeef'),
|
||||
'2018-01-09 13:10:59 (UTC)',
|
||||
)
|
||||
|
||||
self.assertEqual(meta, (
|
||||
'https://x.y.z/schema.json',
|
||||
'abcdef0123456789',
|
||||
'deadbeefdeadbeefdeadbeefdeadbeef',
|
||||
datetime(2018, 1, 9, 13, 10, 59),
|
||||
))
|
||||
|
||||
def test_validation_fail(self):
|
||||
baseargs = [
|
||||
'https://x.y.z/schema.json',
|
||||
'abcdef0123456789',
|
||||
'deadbeefdeadbeefdeadbeefdeadbeef',
|
||||
datetime(2018, 1, 9, 13, 10, 59),
|
||||
]
|
||||
for i in range(len(baseargs)):
|
||||
with self.subTest(baseargs[i]):
|
||||
args = list(baseargs)
|
||||
args[i] = ''
|
||||
with self.assertRaises(ValueError):
|
||||
Metadata(*args)
|
||||
|
||||
def test_url(self):
|
||||
meta = Metadata(UPSTREAM,
|
||||
'abcdef0123456789',
|
||||
'deadbeefdeadbeefdeadbeefdeadbeef',
|
||||
datetime(2018, 1, 9, 13, 10, 59),
|
||||
)
|
||||
url = meta.url
|
||||
|
||||
self.assertEqual(url, 'https://github.com/Microsoft/vscode-debugadapter-node/raw/abcdef0123456789/debugProtocol.json') # noqa
|
||||
|
||||
def test_format(self):
|
||||
meta = Metadata('https://x.y.z/schema.json',
|
||||
'abcdef0123456789',
|
||||
'deadbeefdeadbeefdeadbeefdeadbeef',
|
||||
datetime(2018, 1, 9, 13, 10, 59),
|
||||
)
|
||||
formatted = meta.format()
|
||||
|
||||
self.assertEqual(formatted, dedent("""\
|
||||
upstream: https://x.y.z/schema.json
|
||||
revision: abcdef0123456789
|
||||
checksum: deadbeefdeadbeefdeadbeefdeadbeef
|
||||
date: 2018-01-09 13:10:59 (UTC)
|
||||
"""))
|
||||
|
|
@ -1,23 +1,12 @@
|
|||
from datetime import datetime
|
||||
import io
|
||||
from textwrap import dedent
|
||||
import unittest
|
||||
|
||||
from debugger_protocol.schema import UPSTREAM
|
||||
from debugger_protocol.schema.upstream import download, Metadata
|
||||
|
||||
|
||||
class Stringlike:
|
||||
|
||||
def __init__(self, value):
|
||||
self.value = value
|
||||
|
||||
def __str__(self):
|
||||
return self.value
|
||||
|
||||
|
||||
class Hash(Stringlike):
|
||||
pass
|
||||
from .helpers import StubOpener
|
||||
from debugger_protocol.schema.file import SchemaFileError
|
||||
from debugger_protocol.schema.metadata import Metadata
|
||||
from debugger_protocol.schema.upstream import (
|
||||
download, read)
|
||||
|
||||
|
||||
class DownloadTests(unittest.TestCase):
|
||||
|
|
@ -45,130 +34,27 @@ class DownloadTests(unittest.TestCase):
|
|||
self.assertEqual(rcvd, b'<a schema>')
|
||||
|
||||
|
||||
class MetadataTests(unittest.TestCase):
|
||||
class ReadSchemaTests(unittest.TestCase):
|
||||
|
||||
def test_parse_minimal(self):
|
||||
expected = Metadata('https://x.y.z/schema.json',
|
||||
'abcdef0123456789',
|
||||
'deadbeefdeadbeefdeadbeefdeadbeef',
|
||||
datetime(2018, 1, 9, 13, 10, 59),
|
||||
)
|
||||
meta = Metadata.parse(dedent("""
|
||||
upstream: https://x.y.z/schema.json
|
||||
revision: abcdef0123456789
|
||||
checksum: deadbeefdeadbeefdeadbeefdeadbeef
|
||||
date: 2018-01-09 13:10:59 (UTC)
|
||||
"""))
|
||||
def test_success(self):
|
||||
schemafile = io.BytesIO(b'<a schema>')
|
||||
buf = io.BytesIO(
|
||||
b'{"sha": "fc2395ca3564fb2afded8d90ddbe38dad1bf86f1"}')
|
||||
opener = StubOpener(schemafile, buf)
|
||||
data, meta = read('https://github.com/x/y/raw/master/z',
|
||||
_open_url=opener.open)
|
||||
|
||||
self.assertEqual(meta, expected)
|
||||
self.assertEqual(data, b'<a schema>')
|
||||
self.assertEqual(meta, Metadata(
|
||||
'https://github.com/x/y/raw/master/z',
|
||||
'fc2395ca3564fb2afded8d90ddbe38dad1bf86f1',
|
||||
'e778c3751f9d0bceaf8d5aa81e2c659f',
|
||||
meta.date,
|
||||
))
|
||||
|
||||
def test_parse_with_whitespace_and_comments(self):
|
||||
expected = Metadata('https://x.y.z/schema.json',
|
||||
'abcdef0123456789',
|
||||
'deadbeefdeadbeefdeadbeefdeadbeef',
|
||||
datetime(2018, 1, 9, 13, 10, 59),
|
||||
)
|
||||
meta = Metadata.parse(dedent("""
|
||||
def test_resource_missing(self):
|
||||
schemafile = None
|
||||
opener = StubOpener(schemafile)
|
||||
|
||||
# generated by x.y.z
|
||||
upstream: https://x.y.z/schema.json
|
||||
|
||||
revision: abcdef0123456789
|
||||
checksum: deadbeefdeadbeefdeadbeefdeadbeef
|
||||
date: 2018-01-09 13:10:59 (UTC)
|
||||
|
||||
# done!
|
||||
|
||||
""")) # noqa
|
||||
|
||||
self.assertEqual(meta, expected)
|
||||
|
||||
def test_parse_roundtrip_from_object(self):
|
||||
orig = Metadata('https://x.y.z/schema.json',
|
||||
'abcdef0123456789',
|
||||
'deadbeefdeadbeefdeadbeefdeadbeef',
|
||||
datetime(2018, 1, 9, 13, 10, 59),
|
||||
)
|
||||
meta = Metadata.parse(
|
||||
orig.format())
|
||||
|
||||
self.assertEqual(meta, orig)
|
||||
|
||||
def test_parse_roundtrip_from_string(self):
|
||||
orig = dedent("""\
|
||||
upstream: https://x.y.z/schema.json
|
||||
revision: abcdef0123456789
|
||||
checksum: deadbeefdeadbeefdeadbeefdeadbeef
|
||||
date: 2018-01-09 13:10:59 (UTC)
|
||||
""").format(UPSTREAM)
|
||||
data = (Metadata.parse(orig)
|
||||
).format()
|
||||
|
||||
self.assertEqual(data, orig)
|
||||
|
||||
def test_coercion_noop(self):
|
||||
meta = Metadata('https://x.y.z/schema.json',
|
||||
'abcdef0123456789',
|
||||
'deadbeefdeadbeefdeadbeefdeadbeef',
|
||||
datetime(2018, 1, 9, 13, 10, 59),
|
||||
)
|
||||
|
||||
self.assertEqual(meta, (
|
||||
'https://x.y.z/schema.json',
|
||||
'abcdef0123456789',
|
||||
'deadbeefdeadbeefdeadbeefdeadbeef',
|
||||
datetime(2018, 1, 9, 13, 10, 59),
|
||||
))
|
||||
|
||||
def test_coercion_change_all(self):
|
||||
meta = Metadata(Stringlike('https://x.y.z/schema.json'),
|
||||
Hash('abcdef0123456789'),
|
||||
Hash('deadbeefdeadbeefdeadbeefdeadbeef'),
|
||||
'2018-01-09 13:10:59 (UTC)',
|
||||
)
|
||||
|
||||
self.assertEqual(meta, (
|
||||
'https://x.y.z/schema.json',
|
||||
'abcdef0123456789',
|
||||
'deadbeefdeadbeefdeadbeefdeadbeef',
|
||||
datetime(2018, 1, 9, 13, 10, 59),
|
||||
))
|
||||
|
||||
def test_validation_fail(self):
|
||||
baseargs = [
|
||||
'https://x.y.z/schema.json',
|
||||
'abcdef0123456789',
|
||||
'deadbeefdeadbeefdeadbeefdeadbeef',
|
||||
datetime(2018, 1, 9, 13, 10, 59),
|
||||
]
|
||||
for i in range(len(baseargs)):
|
||||
with self.subTest(baseargs[i]):
|
||||
args = list(baseargs)
|
||||
args[i] = ''
|
||||
with self.assertRaises(ValueError):
|
||||
Metadata(*args)
|
||||
|
||||
def test_url(self):
|
||||
meta = Metadata(UPSTREAM,
|
||||
'abcdef0123456789',
|
||||
'deadbeefdeadbeefdeadbeefdeadbeef',
|
||||
datetime(2018, 1, 9, 13, 10, 59),
|
||||
)
|
||||
url = meta.url
|
||||
|
||||
self.assertEqual(url, 'https://github.com/Microsoft/vscode-debugadapter-node/raw/abcdef0123456789/debugProtocol.json') # noqa
|
||||
|
||||
def test_format(self):
|
||||
meta = Metadata('https://x.y.z/schema.json',
|
||||
'abcdef0123456789',
|
||||
'deadbeefdeadbeefdeadbeefdeadbeef',
|
||||
datetime(2018, 1, 9, 13, 10, 59),
|
||||
)
|
||||
formatted = meta.format()
|
||||
|
||||
self.assertEqual(formatted, dedent("""\
|
||||
upstream: https://x.y.z/schema.json
|
||||
revision: abcdef0123456789
|
||||
checksum: deadbeefdeadbeefdeadbeefdeadbeef
|
||||
date: 2018-01-09 13:10:59 (UTC)
|
||||
"""))
|
||||
with self.assertRaises(SchemaFileError):
|
||||
read('schema.json', _open_url=opener.open)
|
||||
|
|
|
|||
137
tests/debugger_protocol/schema/test_vendored.py
Normal file
137
tests/debugger_protocol/schema/test_vendored.py
Normal file
|
|
@ -0,0 +1,137 @@
|
|||
import io
|
||||
from textwrap import dedent
|
||||
import unittest
|
||||
|
||||
from .helpers import StubOpener
|
||||
from debugger_protocol.schema.file import SchemaFileError
|
||||
from debugger_protocol.schema.metadata import MetadataError
|
||||
from debugger_protocol.schema.vendored import (
|
||||
SchemaFileMismatchError, check_local, check_upstream)
|
||||
|
||||
|
||||
class CheckLocalTests(unittest.TestCase):
|
||||
|
||||
def test_match(self):
|
||||
metafile = io.StringIO(dedent("""
|
||||
upstream: https://x.y.z/schema.json
|
||||
revision: abcdef0123456789
|
||||
checksum: e778c3751f9d0bceaf8d5aa81e2c659f
|
||||
date: 2018-01-09 13:10:59 (UTC)
|
||||
"""))
|
||||
schemafile = io.BytesIO(b'<a schema>')
|
||||
opener = StubOpener(metafile, schemafile)
|
||||
|
||||
# This does not fail.
|
||||
check_local('schema.json', _open=opener.open)
|
||||
|
||||
def test_mismatch(self):
|
||||
metafile = io.StringIO(dedent("""
|
||||
upstream: https://x.y.z/schema.json
|
||||
revision: abcdef0123456789
|
||||
checksum: abc2
|
||||
date: 2018-01-09 13:10:59 (UTC)
|
||||
"""))
|
||||
schemafile = io.BytesIO(b'<a schema>')
|
||||
opener = StubOpener(metafile, schemafile)
|
||||
|
||||
with self.assertRaises(SchemaFileMismatchError) as cm:
|
||||
check_local('schema.json', _open=opener.open)
|
||||
self.assertEqual(str(cm.exception),
|
||||
('schema file \'schema.json\' does not match '
|
||||
'metadata file (checksum mismatch: '
|
||||
'\'e778c3751f9d0bceaf8d5aa81e2c659f\' != \'abc2\')'))
|
||||
|
||||
def test_metafile_missing(self):
|
||||
metafile = None
|
||||
schemafile = io.BytesIO(b'<a schema>')
|
||||
opener = StubOpener(metafile, schemafile)
|
||||
|
||||
with self.assertRaises(MetadataError):
|
||||
check_local('schema.json', _open=opener.open)
|
||||
|
||||
def test_metafile_invalid(self):
|
||||
metafile = io.StringIO('<bogus>')
|
||||
metafile.name = '/x/y/z/UPSTREAM'
|
||||
schemafile = io.BytesIO(b'<a schema>')
|
||||
opener = StubOpener(metafile, schemafile)
|
||||
|
||||
with self.assertRaises(MetadataError):
|
||||
check_local('schema.json', _open=opener.open)
|
||||
|
||||
def test_schemafile_missing(self):
|
||||
metafile = io.StringIO(dedent("""
|
||||
upstream: https://x.y.z/schema.json
|
||||
revision: abcdef0123456789
|
||||
checksum: e778c3751f9d0bceaf8d5aa81e2c659f
|
||||
date: 2018-01-09 13:10:59 (UTC)
|
||||
"""))
|
||||
schemafile = None
|
||||
opener = StubOpener(metafile, schemafile)
|
||||
|
||||
with self.assertRaises(SchemaFileError):
|
||||
check_local('schema.json', _open=opener.open)
|
||||
|
||||
|
||||
class CheckUpstream(unittest.TestCase):
|
||||
|
||||
def test_match(self):
|
||||
metafile = io.StringIO(dedent("""
|
||||
upstream: https://github.com/x/y/raw/master/z
|
||||
revision: fc2395ca3564fb2afded8d90ddbe38dad1bf86f1
|
||||
checksum: e778c3751f9d0bceaf8d5aa81e2c659f
|
||||
date: 2018-01-09 13:10:59 (UTC)
|
||||
"""))
|
||||
schemafile = io.BytesIO(b'<a schema>')
|
||||
buf = io.BytesIO(
|
||||
b'{"sha": "fc2395ca3564fb2afded8d90ddbe38dad1bf86f1"}')
|
||||
opener = StubOpener(metafile, schemafile, buf)
|
||||
|
||||
# This does not fail.
|
||||
check_upstream('schema.json',
|
||||
_open=opener.open, _open_url=opener.open)
|
||||
|
||||
def test_revision_mismatch(self):
|
||||
metafile = io.StringIO(dedent("""
|
||||
upstream: https://github.com/x/y/raw/master/z
|
||||
revision: abc2
|
||||
checksum: e778c3751f9d0bceaf8d5aa81e2c659f
|
||||
date: 2018-01-09 13:10:59 (UTC)
|
||||
"""))
|
||||
schemafile = io.BytesIO(b'<a schema>')
|
||||
buf = io.BytesIO(
|
||||
b'{"sha": "fc2395ca3564fb2afded8d90ddbe38dad1bf86f1"}')
|
||||
opener = StubOpener(metafile, schemafile, buf)
|
||||
|
||||
with self.assertRaises(SchemaFileMismatchError) as cm:
|
||||
check_upstream('schema.json',
|
||||
_open=opener.open, _open_url=opener.open)
|
||||
self.assertEqual(str(cm.exception),
|
||||
('local schema file \'schema.json\' does not match '
|
||||
'upstream \'https://github.com/x/y/raw/master/z\' '
|
||||
'(revision mismatch: \'abc2\' != \'fc2395ca3564fb2afded8d90ddbe38dad1bf86f1\')')) # noqa
|
||||
|
||||
def test_checksum_mismatch(self):
|
||||
metafile = io.StringIO(dedent("""
|
||||
upstream: https://github.com/x/y/raw/master/z
|
||||
revision: fc2395ca3564fb2afded8d90ddbe38dad1bf86f1
|
||||
checksum: abc2
|
||||
date: 2018-01-09 13:10:59 (UTC)
|
||||
"""))
|
||||
schemafile = io.BytesIO(b'<a schema>')
|
||||
buf = io.BytesIO(
|
||||
b'{"sha": "fc2395ca3564fb2afded8d90ddbe38dad1bf86f1"}')
|
||||
opener = StubOpener(metafile, schemafile, buf)
|
||||
|
||||
with self.assertRaises(SchemaFileMismatchError) as cm:
|
||||
check_upstream('schema.json',
|
||||
_open=opener.open, _open_url=opener.open)
|
||||
self.assertEqual(str(cm.exception),
|
||||
('local schema file \'schema.json\' does not match '
|
||||
'upstream \'https://github.com/x/y/raw/master/z\' '
|
||||
'(checksum mismatch: \'abc2\' != \'e778c3751f9d0bceaf8d5aa81e2c659f\')')) # noqa
|
||||
|
||||
def test_metafile_missing(self):
|
||||
...
|
||||
|
||||
def test_url_resource_missing(self):
|
||||
...
|
||||
Loading…
Add table
Add a link
Reference in a new issue