Fixed #30581 -- Added support for Meta.constraints validation.

Thanks Simon Charette, Keryn Knight, and Mariusz Felisiak for reviews.
This commit is contained in:
Gagaro 2022-01-31 16:04:13 +01:00 committed by Mariusz Felisiak
parent 441103a04d
commit 667105877e
17 changed files with 852 additions and 88 deletions

View file

@ -1,11 +1,13 @@
import warnings
from django.contrib.postgres.indexes import OpClass
from django.db import NotSupportedError
from django.core.exceptions import ValidationError
from django.db import DEFAULT_DB_ALIAS, NotSupportedError
from django.db.backends.ddl_references import Expressions, Statement, Table
from django.db.models import BaseConstraint, Deferrable, F, Q
from django.db.models.expressions import ExpressionList
from django.db.models.expressions import Exists, ExpressionList
from django.db.models.indexes import IndexExpression
from django.db.models.lookups import PostgresOperatorLookup
from django.db.models.sql import Query
from django.utils.deprecation import RemovedInDjango50Warning
@ -32,6 +34,7 @@ class ExclusionConstraint(BaseConstraint):
deferrable=None,
include=None,
opclasses=(),
violation_error_message=None,
):
if index_type and index_type.lower() not in {"gist", "spgist"}:
raise ValueError(
@ -78,7 +81,7 @@ class ExclusionConstraint(BaseConstraint):
category=RemovedInDjango50Warning,
stacklevel=2,
)
super().__init__(name=name)
super().__init__(name=name, violation_error_message=violation_error_message)
def _get_expressions(self, schema_editor, query):
expressions = []
@ -197,3 +200,44 @@ class ExclusionConstraint(BaseConstraint):
"" if not self.include else " include=%s" % repr(self.include),
"" if not self.opclasses else " opclasses=%s" % repr(self.opclasses),
)
def validate(self, model, instance, exclude=None, using=DEFAULT_DB_ALIAS):
queryset = model._default_manager.using(using)
replacement_map = instance._get_field_value_map(
meta=model._meta, exclude=exclude
)
lookups = []
for idx, (expression, operator) in enumerate(self.expressions):
if isinstance(expression, str):
expression = F(expression)
if isinstance(expression, F):
if exclude and expression.name in exclude:
return
rhs_expression = replacement_map.get(expression.name, expression)
else:
rhs_expression = expression.replace_references(replacement_map)
if exclude:
for expr in rhs_expression.flatten():
if isinstance(expr, F) and expr.name in exclude:
return
# Remove OpClass because it only has sense during the constraint
# creation.
if isinstance(expression, OpClass):
expression = expression.get_source_expressions()[0]
if isinstance(rhs_expression, OpClass):
rhs_expression = rhs_expression.get_source_expressions()[0]
lookup = PostgresOperatorLookup(lhs=expression, rhs=rhs_expression)
lookup.postgres_operator = operator
lookups.append(lookup)
queryset = queryset.filter(*lookups)
model_class_pk = instance._get_pk_val(model._meta)
if not instance._state.adding and model_class_pk is not None:
queryset = queryset.exclude(pk=model_class_pk)
if not self.condition:
if queryset.exists():
raise ValidationError(self.get_violation_error_message())
else:
if (self.condition & Exists(queryset.filter(self.condition))).check(
replacement_map, using=using
):
raise ValidationError(self.get_violation_error_message())