mirror of
				https://github.com/django/django.git
				synced 2025-11-04 05:35:37 +00:00 
			
		
		
		
	git-svn-id: http://code.djangoproject.com/svn/django/trunk@14507 bcc190cf-cafb-0310-a4f2-bffc1f526a37
		
			
				
	
	
		
			253 lines
		
	
	
	
		
			8.6 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			253 lines
		
	
	
	
		
			8.6 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
from django.db import models, IntegrityError
 | 
						|
from django.test import TestCase, skipUnlessDBFeature, skipIfDBFeature
 | 
						|
 | 
						|
from modeltests.delete.models import (R, RChild, S, T, U, A, M, MR, MRNull,
 | 
						|
    create_a, get_default_r, User, Avatar, HiddenUser, HiddenUserProfile)
 | 
						|
 | 
						|
 | 
						|
class OnDeleteTests(TestCase):
 | 
						|
    def setUp(self):
 | 
						|
        self.DEFAULT = get_default_r()
 | 
						|
 | 
						|
    def test_auto(self):
 | 
						|
        a = create_a('auto')
 | 
						|
        a.auto.delete()
 | 
						|
        self.assertFalse(A.objects.filter(name='auto').exists())
 | 
						|
 | 
						|
    def test_auto_nullable(self):
 | 
						|
        a = create_a('auto_nullable')
 | 
						|
        a.auto_nullable.delete()
 | 
						|
        self.assertFalse(A.objects.filter(name='auto_nullable').exists())
 | 
						|
 | 
						|
    def test_setvalue(self):
 | 
						|
        a = create_a('setvalue')
 | 
						|
        a.setvalue.delete()
 | 
						|
        a = A.objects.get(pk=a.pk)
 | 
						|
        self.assertEqual(self.DEFAULT, a.setvalue)
 | 
						|
 | 
						|
    def test_setnull(self):
 | 
						|
        a = create_a('setnull')
 | 
						|
        a.setnull.delete()
 | 
						|
        a = A.objects.get(pk=a.pk)
 | 
						|
        self.assertEqual(None, a.setnull)
 | 
						|
 | 
						|
    def test_setdefault(self):
 | 
						|
        a = create_a('setdefault')
 | 
						|
        a.setdefault.delete()
 | 
						|
        a = A.objects.get(pk=a.pk)
 | 
						|
        self.assertEqual(self.DEFAULT, a.setdefault)
 | 
						|
 | 
						|
    def test_setdefault_none(self):
 | 
						|
        a = create_a('setdefault_none')
 | 
						|
        a.setdefault_none.delete()
 | 
						|
        a = A.objects.get(pk=a.pk)
 | 
						|
        self.assertEqual(None, a.setdefault_none)
 | 
						|
 | 
						|
    def test_cascade(self):
 | 
						|
        a = create_a('cascade')
 | 
						|
        a.cascade.delete()
 | 
						|
        self.assertFalse(A.objects.filter(name='cascade').exists())
 | 
						|
 | 
						|
    def test_cascade_nullable(self):
 | 
						|
        a = create_a('cascade_nullable')
 | 
						|
        a.cascade_nullable.delete()
 | 
						|
        self.assertFalse(A.objects.filter(name='cascade_nullable').exists())
 | 
						|
 | 
						|
    def test_protect(self):
 | 
						|
        a = create_a('protect')
 | 
						|
        self.assertRaises(IntegrityError, a.protect.delete)
 | 
						|
 | 
						|
    def test_do_nothing(self):
 | 
						|
        # Testing DO_NOTHING is a bit harder: It would raise IntegrityError for a normal model,
 | 
						|
        # so we connect to pre_delete and set the fk to a known value.
 | 
						|
        replacement_r = R.objects.create()
 | 
						|
        def check_do_nothing(sender, **kwargs):
 | 
						|
            obj = kwargs['instance']
 | 
						|
            obj.donothing_set.update(donothing=replacement_r)
 | 
						|
        models.signals.pre_delete.connect(check_do_nothing)
 | 
						|
        a = create_a('do_nothing')
 | 
						|
        a.donothing.delete()
 | 
						|
        a = A.objects.get(pk=a.pk)
 | 
						|
        self.assertEqual(replacement_r, a.donothing)
 | 
						|
        models.signals.pre_delete.disconnect(check_do_nothing)
 | 
						|
 | 
						|
    def test_inheritance_cascade_up(self):
 | 
						|
        child = RChild.objects.create()
 | 
						|
        child.delete()
 | 
						|
        self.assertFalse(R.objects.filter(pk=child.pk).exists())
 | 
						|
 | 
						|
    def test_inheritance_cascade_down(self):
 | 
						|
        child = RChild.objects.create()
 | 
						|
        parent = child.r_ptr
 | 
						|
        parent.delete()
 | 
						|
        self.assertFalse(RChild.objects.filter(pk=child.pk).exists())
 | 
						|
 | 
						|
    def test_cascade_from_child(self):
 | 
						|
        a = create_a('child')
 | 
						|
        a.child.delete()
 | 
						|
        self.assertFalse(A.objects.filter(name='child').exists())
 | 
						|
        self.assertFalse(R.objects.filter(pk=a.child_id).exists())
 | 
						|
 | 
						|
    def test_cascade_from_parent(self):
 | 
						|
        a = create_a('child')
 | 
						|
        R.objects.get(pk=a.child_id).delete()
 | 
						|
        self.assertFalse(A.objects.filter(name='child').exists())
 | 
						|
        self.assertFalse(RChild.objects.filter(pk=a.child_id).exists())
 | 
						|
 | 
						|
    def test_setnull_from_child(self):
 | 
						|
        a = create_a('child_setnull')
 | 
						|
        a.child_setnull.delete()
 | 
						|
        self.assertFalse(R.objects.filter(pk=a.child_setnull_id).exists())
 | 
						|
 | 
						|
        a = A.objects.get(pk=a.pk)
 | 
						|
        self.assertEqual(None, a.child_setnull)
 | 
						|
 | 
						|
    def test_setnull_from_parent(self):
 | 
						|
        a = create_a('child_setnull')
 | 
						|
        R.objects.get(pk=a.child_setnull_id).delete()
 | 
						|
        self.assertFalse(RChild.objects.filter(pk=a.child_setnull_id).exists())
 | 
						|
 | 
						|
        a = A.objects.get(pk=a.pk)
 | 
						|
        self.assertEqual(None, a.child_setnull)
 | 
						|
 | 
						|
    def test_o2o_setnull(self):
 | 
						|
        a = create_a('o2o_setnull')
 | 
						|
        a.o2o_setnull.delete()
 | 
						|
        a = A.objects.get(pk=a.pk)
 | 
						|
        self.assertEqual(None, a.o2o_setnull)
 | 
						|
 | 
						|
 | 
						|
class DeletionTests(TestCase):
 | 
						|
    def test_m2m(self):
 | 
						|
        m = M.objects.create()
 | 
						|
        r = R.objects.create()
 | 
						|
        MR.objects.create(m=m, r=r)
 | 
						|
        r.delete()
 | 
						|
        self.assertFalse(MR.objects.exists())
 | 
						|
 | 
						|
        r = R.objects.create()
 | 
						|
        MR.objects.create(m=m, r=r)
 | 
						|
        m.delete()
 | 
						|
        self.assertFalse(MR.objects.exists())
 | 
						|
 | 
						|
        m = M.objects.create()
 | 
						|
        r = R.objects.create()
 | 
						|
        m.m2m.add(r)
 | 
						|
        r.delete()
 | 
						|
        through = M._meta.get_field('m2m').rel.through
 | 
						|
        self.assertFalse(through.objects.exists())
 | 
						|
 | 
						|
        r = R.objects.create()
 | 
						|
        m.m2m.add(r)
 | 
						|
        m.delete()
 | 
						|
        self.assertFalse(through.objects.exists())
 | 
						|
 | 
						|
        m = M.objects.create()
 | 
						|
        r = R.objects.create()
 | 
						|
        MRNull.objects.create(m=m, r=r)
 | 
						|
        r.delete()
 | 
						|
        self.assertFalse(not MRNull.objects.exists())
 | 
						|
        self.assertFalse(m.m2m_through_null.exists())
 | 
						|
 | 
						|
    def test_bulk(self):
 | 
						|
        from django.db.models.sql.constants import GET_ITERATOR_CHUNK_SIZE
 | 
						|
        s = S.objects.create(r=R.objects.create())
 | 
						|
        for i in xrange(2*GET_ITERATOR_CHUNK_SIZE):
 | 
						|
            T.objects.create(s=s)
 | 
						|
        #   1 (select related `T` instances)
 | 
						|
        # + 1 (select related `U` instances)
 | 
						|
        # + 2 (delete `T` instances in batches)
 | 
						|
        # + 1 (delete `s`)
 | 
						|
        self.assertNumQueries(5, s.delete)
 | 
						|
        self.assertFalse(S.objects.exists())
 | 
						|
 | 
						|
    def test_instance_update(self):
 | 
						|
        deleted = []
 | 
						|
        related_setnull_sets = []
 | 
						|
        def pre_delete(sender, **kwargs):
 | 
						|
            obj = kwargs['instance']
 | 
						|
            deleted.append(obj)
 | 
						|
            if isinstance(obj, R):
 | 
						|
                related_setnull_sets.append(list(a.pk for a in obj.setnull_set.all()))
 | 
						|
 | 
						|
        models.signals.pre_delete.connect(pre_delete)
 | 
						|
        a = create_a('update_setnull')
 | 
						|
        a.setnull.delete()
 | 
						|
 | 
						|
        a = create_a('update_cascade')
 | 
						|
        a.cascade.delete()
 | 
						|
 | 
						|
        for obj in deleted:
 | 
						|
            self.assertEqual(None, obj.pk)
 | 
						|
 | 
						|
        for pk_list in related_setnull_sets:
 | 
						|
            for a in A.objects.filter(id__in=pk_list):
 | 
						|
                self.assertEqual(None, a.setnull)
 | 
						|
 | 
						|
        models.signals.pre_delete.disconnect(pre_delete)
 | 
						|
 | 
						|
    def test_deletion_order(self):
 | 
						|
        pre_delete_order = []
 | 
						|
        post_delete_order = []
 | 
						|
 | 
						|
        def log_post_delete(sender, **kwargs):
 | 
						|
            pre_delete_order.append((sender, kwargs['instance'].pk))
 | 
						|
 | 
						|
        def log_pre_delete(sender, **kwargs):
 | 
						|
            post_delete_order.append((sender, kwargs['instance'].pk))
 | 
						|
 | 
						|
        models.signals.post_delete.connect(log_post_delete)
 | 
						|
        models.signals.pre_delete.connect(log_pre_delete)
 | 
						|
 | 
						|
        r = R.objects.create(pk=1)
 | 
						|
        s1 = S.objects.create(pk=1, r=r)
 | 
						|
        s2 = S.objects.create(pk=2, r=r)
 | 
						|
        t1 = T.objects.create(pk=1, s=s1)
 | 
						|
        t2 = T.objects.create(pk=2, s=s2)
 | 
						|
        r.delete()
 | 
						|
        self.assertEqual(
 | 
						|
            pre_delete_order, [(T, 2), (T, 1), (S, 2), (S, 1), (R, 1)]
 | 
						|
        )
 | 
						|
        self.assertEqual(
 | 
						|
            post_delete_order, [(T, 1), (T, 2), (S, 1), (S, 2), (R, 1)]
 | 
						|
        )
 | 
						|
 | 
						|
        models.signals.post_delete.disconnect(log_post_delete)
 | 
						|
        models.signals.post_delete.disconnect(log_pre_delete)
 | 
						|
 | 
						|
    @skipUnlessDBFeature("can_defer_constraint_checks")
 | 
						|
    def test_can_defer_constraint_checks(self):
 | 
						|
        u = User.objects.create(
 | 
						|
            avatar=Avatar.objects.create()
 | 
						|
        )
 | 
						|
        a = Avatar.objects.get(pk=u.avatar_id)
 | 
						|
        # 1 query to find the users for the avatar.
 | 
						|
        # 1 query to delete the user
 | 
						|
        # 1 query to delete the avatar
 | 
						|
        # The important thing is that when we can defer constraint checks there
 | 
						|
        # is no need to do an UPDATE on User.avatar to null it out.
 | 
						|
        self.assertNumQueries(3, a.delete)
 | 
						|
        self.assertFalse(User.objects.exists())
 | 
						|
        self.assertFalse(Avatar.objects.exists())
 | 
						|
 | 
						|
    @skipIfDBFeature("can_defer_constraint_checks")
 | 
						|
    def test_cannot_defer_constraint_checks(self):
 | 
						|
        u = User.objects.create(
 | 
						|
            avatar=Avatar.objects.create()
 | 
						|
        )
 | 
						|
        a = Avatar.objects.get(pk=u.avatar_id)
 | 
						|
        # 1 query to find the users for the avatar.
 | 
						|
        # 1 query to delete the user
 | 
						|
        # 1 query to null out user.avatar, because we can't defer the constraint
 | 
						|
        # 1 query to delete the avatar
 | 
						|
        self.assertNumQueries(4, a.delete)
 | 
						|
        self.assertFalse(User.objects.exists())
 | 
						|
        self.assertFalse(Avatar.objects.exists())
 | 
						|
 | 
						|
    def test_hidden_related(self):
 | 
						|
        r = R.objects.create()
 | 
						|
        h = HiddenUser.objects.create(r=r)
 | 
						|
        p = HiddenUserProfile.objects.create(user=h)
 | 
						|
 | 
						|
        r.delete()
 | 
						|
        self.assertEqual(HiddenUserProfile.objects.count(), 0)
 |