Project

General

Profile

Statistics
| Branch: | Revision:

root / env / lib / python2.7 / site-packages / south / tests / db.py @ d1a4905f

History | View | Annotate | Download (33.7 KB)

1
import datetime
2

    
3
from south.db import db, generic
4
from django.db import connection, models, IntegrityError
5

    
6
from south.tests import unittest, skipUnless        
7

    
8
# Create a list of error classes from the various database libraries
9
errors = []
10
try:
11
    from psycopg2 import ProgrammingError
12
    errors.append(ProgrammingError)
13
except ImportError:
14
    pass
15
errors = tuple(errors)
16

    
17
try:
18
    from south.db import mysql
19
except ImportError:
20
    mysql = None
21

    
22

    
23
class TestOperations(unittest.TestCase):
24

    
25
    """
26
    Tests if the various DB abstraction calls work.
27
    Can only test a limited amount due to DB differences.
28
    """
29

    
30
    def setUp(self):
31
        db.debug = False
32
        db.clear_deferred_sql()
33
        db.start_transaction()
34
    
35
    def tearDown(self):
36
        db.rollback_transaction()
37

    
38
    def test_create(self):
39
        """
40
        Test creation of tables.
41
        """
42
        cursor = connection.cursor()
43
        # It needs to take at least 2 args
44
        self.assertRaises(TypeError, db.create_table)
45
        self.assertRaises(TypeError, db.create_table, "test1")
46
        # Empty tables (i.e. no columns) are not fine, so make at least 1
47
        db.create_table("test1", [('email_confirmed', models.BooleanField(default=False))])
48
        # And should exist
49
        cursor.execute("SELECT * FROM test1")
50
        # Make sure we can't do the same query on an empty table
51
        try:
52
            cursor.execute("SELECT * FROM nottheretest1")
53
        except:
54
            pass
55
        else:
56
            self.fail("Non-existent table could be selected!")
57
    
58
    def test_delete(self):
59
        """
60
        Test deletion of tables.
61
        """
62
        cursor = connection.cursor()
63
        db.create_table("test_deltable", [('email_confirmed', models.BooleanField(default=False))])
64
        db.delete_table("test_deltable")
65
        # Make sure it went
66
        try:
67
            cursor.execute("SELECT * FROM test_deltable")
68
        except:
69
            pass
70
        else:
71
            self.fail("Just-deleted table could be selected!")
72
    
73
    def test_nonexistent_delete(self):
74
        """
75
        Test deletion of nonexistent tables.
76
        """
77
        try:
78
            db.delete_table("test_nonexistdeltable")
79
        except:
80
            pass
81
        else:
82
            self.fail("Non-existent table could be deleted!")
83
    
84
    def test_foreign_keys(self):
85
        """
86
        Tests foreign key creation, especially uppercase (see #61)
87
        """
88
        Test = db.mock_model(model_name='Test', db_table='test5a',
89
                             db_tablespace='', pk_field_name='ID',
90
                             pk_field_type=models.AutoField, pk_field_args=[])
91
        db.create_table("test5a", [('ID', models.AutoField(verbose_name='ID', primary_key=True, auto_created=True))])
92
        db.create_table("test5b", [
93
            ('id', models.AutoField(verbose_name='ID', primary_key=True, auto_created=True)),
94
            ('UNIQUE', models.ForeignKey(Test)),
95
        ])
96
        db.execute_deferred_sql()
97
        
98
    @skipUnless(db.supports_foreign_keys, 'Foreign keys can only be deleted on '
99
                                          'engines that support them.')
100
    def test_recursive_foreign_key_delete(self):
101
        """
102
        Test that recursive foreign keys are deleted correctly (see #1065)
103
        """
104
        Test = db.mock_model(model_name='Test', db_table='test_rec_fk_del',
105
                             db_tablespace='', pk_field_name='id',
106
                             pk_field_type=models.AutoField, pk_field_args=[])
107
        db.create_table('test_rec_fk_del', [
108
            ('id', models.AutoField(primary_key=True, auto_created=True)),
109
            ('fk', models.ForeignKey(Test)),
110
        ])
111
        db.execute_deferred_sql()
112
        db.delete_foreign_key('test_rec_fk_del', 'fk_id')
113
    
114
    def test_rename(self):
115
        """
116
        Test column renaming
117
        """
118
        cursor = connection.cursor()
119
        db.create_table("test_rn", [('spam', models.BooleanField(default=False))])
120
        # Make sure we can select the column
121
        cursor.execute("SELECT spam FROM test_rn")
122
        # Rename it
123
        db.rename_column("test_rn", "spam", "eggs")
124
        cursor.execute("SELECT eggs FROM test_rn")
125
        db.commit_transaction()
126
        db.start_transaction()
127
        try:
128
            cursor.execute("SELECT spam FROM test_rn")
129
        except:
130
            pass
131
        else:
132
            self.fail("Just-renamed column could be selected!")
133
        db.rollback_transaction()
134
        db.delete_table("test_rn")
135
        db.start_transaction()
136
    
137
    def test_dry_rename(self):
138
        """
139
        Test column renaming while --dry-run is turned on (should do nothing)
140
        See ticket #65
141
        """
142
        cursor = connection.cursor()
143
        db.create_table("test_drn", [('spam', models.BooleanField(default=False))])
144
        # Make sure we can select the column
145
        cursor.execute("SELECT spam FROM test_drn")
146
        # Rename it
147
        db.dry_run = True
148
        db.rename_column("test_drn", "spam", "eggs")
149
        db.dry_run = False
150
        cursor.execute("SELECT spam FROM test_drn")
151
        db.commit_transaction()
152
        db.start_transaction()
153
        try:
154
            cursor.execute("SELECT eggs FROM test_drn")
155
        except:
156
            pass
157
        else:
158
            self.fail("Dry-renamed new column could be selected!")
159
        db.rollback_transaction()
160
        db.delete_table("test_drn")
161
        db.start_transaction()
162
    
163
    def test_table_rename(self):
164
        """
165
        Test column renaming
166
        """
167
        cursor = connection.cursor()
168
        db.create_table("testtr", [('spam', models.BooleanField(default=False))])
169
        # Make sure we can select the column
170
        cursor.execute("SELECT spam FROM testtr")
171
        # Rename it
172
        db.rename_table("testtr", "testtr2")
173
        cursor.execute("SELECT spam FROM testtr2")
174
        db.commit_transaction()
175
        db.start_transaction()
176
        try:
177
            cursor.execute("SELECT spam FROM testtr")
178
        except:
179
            pass
180
        else:
181
            self.fail("Just-renamed column could be selected!")
182
        db.rollback_transaction()
183
        db.delete_table("testtr2")
184
        db.start_transaction()
185
    
186
    def test_percents_in_defaults(self):
187
        """
188
        Test that % in a default gets escaped to %%.
189
        """
190
        try:
191
            db.create_table("testpind", [('cf', models.CharField(max_length=255, default="It should be 2%!"))])
192
        except IndexError:
193
            self.fail("% was not properly escaped in column SQL.")
194
        db.delete_table("testpind")
195
    
196
    def test_index(self):
197
        """
198
        Test the index operations
199
        """
200
        db.create_table("test3", [
201
            ('SELECT', models.BooleanField(default=False)),
202
            ('eggs', models.IntegerField(unique=True)),
203
        ])
204
        db.execute_deferred_sql()
205
        # Add an index on that column
206
        db.create_index("test3", ["SELECT"])
207
        # Add another index on two columns
208
        db.create_index("test3", ["SELECT", "eggs"])
209
        # Delete them both
210
        db.delete_index("test3", ["SELECT"])
211
        db.delete_index("test3", ["SELECT", "eggs"])
212
        # Delete the unique index/constraint
213
        if db.backend_name != "sqlite3":
214
            db.delete_unique("test3", ["eggs"])
215
        db.delete_table("test3")
216
    
217
    def test_primary_key(self):
218
        """
219
        Test the primary key operations
220
        """
221
        
222
        db.create_table("test_pk", [
223
            ('id', models.IntegerField(primary_key=True)),
224
            ('new_pkey', models.IntegerField()),
225
            ('eggs', models.IntegerField(unique=True)),
226
        ])
227
        db.execute_deferred_sql()
228
        # Remove the default primary key, and make eggs it
229
        db.delete_primary_key("test_pk")
230
        db.create_primary_key("test_pk", "new_pkey")
231
        # Try inserting a now-valid row pair
232
        db.execute("INSERT INTO test_pk (id, new_pkey, eggs) VALUES (1, 2, 3)")
233
        db.execute("INSERT INTO test_pk (id, new_pkey, eggs) VALUES (1, 3, 4)")
234
        db.delete_table("test_pk")
235
    
236
    def test_primary_key_implicit(self):
237
        """
238
        Tests that changing primary key implicitly fails.
239
        """
240
        db.create_table("test_pki", [
241
            ('id', models.IntegerField(primary_key=True)),
242
            ('new_pkey', models.IntegerField()),
243
            ('eggs', models.IntegerField(unique=True)),
244
        ])
245
        db.execute_deferred_sql()
246
        # Fiddle with alter_column to attempt to make it remove the primary key
247
        db.alter_column("test_pki", "id", models.IntegerField())
248
        db.alter_column("test_pki", "new_pkey", models.IntegerField(primary_key=True))
249
        # Try inserting a should-be-valid row pair
250
        db.execute("INSERT INTO test_pki (id, new_pkey, eggs) VALUES (1, 2, 3)")
251
        db.execute("INSERT INTO test_pki (id, new_pkey, eggs) VALUES (2, 2, 4)")
252
        db.delete_table("test_pki")
253
    
254
    def test_add_columns(self):
255
        """
256
        Test adding columns
257
        """
258
        db.create_table("test_addc", [
259
            ('spam', models.BooleanField(default=False)),
260
            ('eggs', models.IntegerField()),
261
        ])
262
        # Add a column
263
        db.add_column("test_addc", "add1", models.IntegerField(default=3), keep_default=False)
264
        # Add a FK with keep_default=False (#69)
265
        User = db.mock_model(model_name='User', db_table='auth_user', db_tablespace='', pk_field_name='id', pk_field_type=models.AutoField, pk_field_args=[], pk_field_kwargs={})
266
        # insert some data so we can test the default value of the added fkey
267
        db.execute("INSERT INTO test_addc (spam, eggs, add1) VALUES (%s, 1, 2)", [False])
268
        db.add_column("test_addc", "user", models.ForeignKey(User, null=True), keep_default=False)
269
        db.execute_deferred_sql()
270
        # try selecting from the user_id column to make sure it was actually created
271
        val = db.execute("SELECT user_id FROM test_addc")[0][0]
272
        self.assertEquals(val, None)
273
        db.delete_column("test_addc", "add1")
274
        db.delete_table("test_addc")
275

    
276
    def test_add_nullbool_column(self):
277
        """
278
        Test adding NullBoolean columns
279
        """
280
        db.create_table("test_addnbc", [
281
            ('spam', models.BooleanField(default=False)),
282
            ('eggs', models.IntegerField()),
283
        ])
284
        # Add a column
285
        db.add_column("test_addnbc", "add1", models.NullBooleanField())
286
        # Add a column with a default
287
        db.add_column("test_addnbc", "add2", models.NullBooleanField(default=True))
288
        # insert some data so we can test the default values of the added column
289
        db.execute("INSERT INTO test_addnbc (spam, eggs) VALUES (%s, 1)", [False])
290
        # try selecting from the new columns to make sure they were properly created
291
        false, null, true = db.execute("SELECT spam,add1,add2 FROM test_addnbc")[0][0:3]
292
        self.assertTrue(true)
293
        self.assertEquals(null, None)
294
        self.assertEquals(false, False)
295
        db.delete_table("test_addnbc")
296
    
297
    def test_alter_columns(self):
298
        """
299
        Test altering columns
300
        """
301
        db.create_table("test_alterc", [
302
            ('spam', models.BooleanField(default=False)),
303
            ('eggs', models.IntegerField()),
304
        ])
305
        db.execute_deferred_sql()
306
        # Change eggs to be a FloatField
307
        db.alter_column("test_alterc", "eggs", models.FloatField())
308
        db.execute_deferred_sql()
309
        db.delete_table("test_alterc")
310
        db.execute_deferred_sql()
311
    
312
    def test_alter_char_default(self):
313
        """
314
        Test altering column defaults with char fields
315
        """
316
        db.create_table("test_altercd", [
317
            ('spam', models.CharField(max_length=30)),
318
            ('eggs', models.IntegerField()),
319
        ])
320
        # Change spam default
321
        db.alter_column("test_altercd", "spam", models.CharField(max_length=30, default="loof"))
322
        
323
    def test_mysql_defaults(self):
324
        """
325
        Test MySQL default handling for BLOB and TEXT.
326
        """
327
        db.create_table("test_altermyd", [
328
            ('spam', models.BooleanField(default=False)),
329
            ('eggs', models.TextField()),
330
        ])
331
        # Change eggs to be a FloatField
332
        db.alter_column("test_altermyd", "eggs", models.TextField(null=True))
333
        db.delete_table("test_altermyd")
334
    
335
    def test_alter_column_postgres_multiword(self):
336
        """
337
        Tests altering columns with multiple words in Postgres types (issue #125)
338
        e.g. 'datetime with time zone', look at django/db/backends/postgresql/creation.py
339
        """
340
        db.create_table("test_multiword", [
341
            ('col_datetime', models.DateTimeField(null=True)),
342
            ('col_integer', models.PositiveIntegerField(null=True)),
343
            ('col_smallint', models.PositiveSmallIntegerField(null=True)),
344
            ('col_float', models.FloatField(null=True)),
345
        ])
346
        
347
        # test if 'double precision' is preserved
348
        db.alter_column('test_multiword', 'col_float', models.FloatField('float', null=True))
349

    
350
        # test if 'CHECK ("%(column)s" >= 0)' is stripped
351
        db.alter_column('test_multiword', 'col_integer', models.PositiveIntegerField(null=True))
352
        db.alter_column('test_multiword', 'col_smallint', models.PositiveSmallIntegerField(null=True))
353

    
354
        # test if 'with timezone' is preserved
355
        if db.backend_name == "postgres":
356
            db.execute("INSERT INTO test_multiword (col_datetime) VALUES ('2009-04-24 14:20:55+02')")
357
            db.alter_column('test_multiword', 'col_datetime', models.DateTimeField(auto_now=True))
358
            assert db.execute("SELECT col_datetime = '2009-04-24 14:20:55+02' FROM test_multiword")[0][0]
359

    
360
        db.delete_table("test_multiword")
361
    
362
    def test_alter_constraints(self):
363
        """
364
        Tests that going from a PostiveIntegerField to an IntegerField drops
365
        the constraint on the database.
366
        """
367
        # Only applies to databases that support CHECK constraints
368
        if not db.has_check_constraints:
369
            return
370
        # Make the test table
371
        db.create_table("test_alterc", [
372
            ('num', models.PositiveIntegerField()),
373
        ])
374
        db.execute_deferred_sql()
375
        # Add in some test values
376
        db.execute("INSERT INTO test_alterc (num) VALUES (1)")
377
        db.execute("INSERT INTO test_alterc (num) VALUES (2)")
378
        # Ensure that adding a negative number is bad
379
        db.commit_transaction()
380
        db.start_transaction()
381
        try:
382
            db.execute("INSERT INTO test_alterc (num) VALUES (-3)")
383
        except:
384
            db.rollback_transaction()
385
        else:
386
            self.fail("Could insert a negative integer into a PositiveIntegerField.")
387
        # Alter it to a normal IntegerField
388
        db.alter_column("test_alterc", "num", models.IntegerField())
389
        db.execute_deferred_sql()
390
        # It should now work
391
        db.execute("INSERT INTO test_alterc (num) VALUES (-3)")
392
        db.delete_table("test_alterc")
393
        # We need to match up for tearDown
394
        db.start_transaction()
395
    
396
    def test_unique(self):
397
        """
398
        Tests creating/deleting unique constraints.
399
        """
400
        
401
        # SQLite backend doesn't support this yet.
402
        if db.backend_name == "sqlite3":
403
            return
404
        
405
        db.create_table("test_unique2", [
406
            ('id', models.AutoField(primary_key=True)),
407
        ])
408
        db.create_table("test_unique", [
409
            ('spam', models.BooleanField(default=False)),
410
            ('eggs', models.IntegerField()),
411
            ('ham', models.ForeignKey(db.mock_model('Unique2', 'test_unique2'))),
412
        ])
413
        db.execute_deferred_sql()
414
        # Add a constraint
415
        db.create_unique("test_unique", ["spam"])
416
        db.execute_deferred_sql()
417
        # Shouldn't do anything during dry-run
418
        db.dry_run = True
419
        db.delete_unique("test_unique", ["spam"])
420
        db.dry_run = False
421
        db.delete_unique("test_unique", ["spam"])
422
        db.create_unique("test_unique", ["spam"])
423
        # Special preparations for Sql Server
424
        if db.backend_name == "pyodbc":
425
            db.execute("SET IDENTITY_INSERT test_unique2 ON;")
426
        db.execute("INSERT INTO test_unique2 (id) VALUES (1)")
427
        db.execute("INSERT INTO test_unique2 (id) VALUES (2)")
428
        db.commit_transaction()
429
        db.start_transaction()
430

    
431
        
432
        # Test it works
433
        TRUE = (True,)
434
        FALSE = (False,)
435
        db.execute("INSERT INTO test_unique (spam, eggs, ham_id) VALUES (%s, 0, 1)", TRUE)
436
        db.execute("INSERT INTO test_unique (spam, eggs, ham_id) VALUES (%s, 1, 2)", FALSE)
437
        try:
438
            db.execute("INSERT INTO test_unique (spam, eggs, ham_id) VALUES (%s, 2, 1)", FALSE)
439
        except:
440
            db.rollback_transaction()
441
        else:
442
            self.fail("Could insert non-unique item.")
443
        
444
        # Drop that, add one only on eggs
445
        db.delete_unique("test_unique", ["spam"])
446
        db.execute("DELETE FROM test_unique")
447
        db.create_unique("test_unique", ["eggs"])
448
        db.start_transaction()
449
        
450
        # Test similarly
451
        db.execute("INSERT INTO test_unique (spam, eggs, ham_id) VALUES (%s, 0, 1)", TRUE)
452
        db.execute("INSERT INTO test_unique (spam, eggs, ham_id) VALUES (%s, 1, 2)", FALSE)
453
        try:
454
            db.execute("INSERT INTO test_unique (spam, eggs, ham_id) VALUES (%s, 1, 1)", TRUE)
455
        except:
456
            db.rollback_transaction()
457
        else:
458
            self.fail("Could insert non-unique item.")
459
        
460
        # Drop those, test combined constraints
461
        db.delete_unique("test_unique", ["eggs"])
462
        db.execute("DELETE FROM test_unique")
463
        db.create_unique("test_unique", ["spam", "eggs", "ham_id"])
464
        db.start_transaction()
465
        # Test similarly
466
        db.execute("INSERT INTO test_unique (spam, eggs, ham_id) VALUES (%s, 0, 1)", TRUE)
467
        db.execute("INSERT INTO test_unique (spam, eggs, ham_id) VALUES (%s, 1, 1)", FALSE)
468
        try:
469
            db.execute("INSERT INTO test_unique (spam, eggs, ham_id) VALUES (%s, 0, 1)", TRUE)
470
        except:
471
            db.rollback_transaction()
472
        else:
473
            self.fail("Could insert non-unique pair.")
474
        db.delete_unique("test_unique", ["spam", "eggs", "ham_id"])
475
        db.start_transaction()
476
    
477
    def test_alter_unique(self):
478
        """
479
        Tests that unique constraints are not affected when
480
        altering columns (that's handled by create_/delete_unique)
481
        """
482
        db.create_table("test_alter_unique", [
483
            ('spam', models.IntegerField()),
484
            ('eggs', models.IntegerField(unique=True)),
485
        ])
486
        db.execute_deferred_sql()
487
        
488
        # Make sure the unique constraint is created
489
        db.execute('INSERT INTO test_alter_unique (spam, eggs) VALUES (0, 42)')
490
        db.commit_transaction()
491
        db.start_transaction()
492
        try:
493
            db.execute("INSERT INTO test_alter_unique (spam, eggs) VALUES (1, 42)")
494
        except:
495
            pass
496
        else:
497
            self.fail("Could insert the same integer twice into a unique field.")
498
        db.rollback_transaction()
499

    
500
        # Alter without unique=True (should not affect anything)
501
        db.alter_column("test_alter_unique", "eggs", models.IntegerField())
502

    
503
        # Insertion should still fail
504
        db.start_transaction()
505
        try:
506
            db.execute("INSERT INTO test_alter_unique (spam, eggs) VALUES (1, 42)")
507
        except:
508
            pass
509
        else:
510
            self.fail("Could insert the same integer twice into a unique field after alter_column with unique=False.")
511
        db.rollback_transaction()
512
        
513
        # Delete the unique index/constraint
514
        if db.backend_name != "sqlite3":
515
            db.delete_unique("test_alter_unique", ["eggs"])
516
        db.delete_table("test_alter_unique")
517
        db.start_transaction()
518

    
519
    def test_capitalised_constraints(self):
520
        """
521
        Under PostgreSQL at least, capitalised constraints must be quoted.
522
        """
523
        db.create_table("test_capconst", [
524
            ('SOMECOL', models.PositiveIntegerField(primary_key=True)),
525
        ])
526
        # Alter it so it's not got the check constraint
527
        db.alter_column("test_capconst", "SOMECOL", models.IntegerField())
528
    
529
    def test_text_default(self):
530
        """
531
        MySQL cannot have blank defaults on TEXT columns.
532
        """
533
        db.create_table("test_textdef", [
534
            ('textcol', models.TextField(blank=True)),
535
        ])
536

    
537
    def test_text_to_char(self):
538
        """
539
        On Oracle, you can't simply ALTER TABLE MODIFY a textfield to a charfield
540
        """
541
        value = "kawabanga"
542
        db.create_table("test_text_to_char", [
543
            ('textcol', models.TextField()),
544
        ])
545
        db.execute_deferred_sql()
546
        db.execute("INSERT INTO test_text_to_char VALUES (%s)", [value])
547
        db.alter_column("test_text_to_char", "textcol", models.CharField(max_length=100))
548
        db.execute_deferred_sql()
549
        after = db.execute("select * from test_text_to_char")[0][0]
550
        self.assertEqual(value, after, "Change from text to char altered value [ %s != %s ]" % (`value`,`after`))
551

    
552
    def test_char_to_text(self):
553
        """
554
        On Oracle, you can't simply ALTER TABLE MODIFY a charfield to a textfield either
555
        """
556
        value = "agnabawak"
557
        db.create_table("test_char_to_text", [
558
            ('textcol', models.CharField(max_length=100)),
559
        ])
560
        db.execute_deferred_sql()
561
        db.execute("INSERT INTO test_char_to_text VALUES (%s)", [value])
562
        db.alter_column("test_char_to_text", "textcol", models.TextField())
563
        db.execute_deferred_sql()
564
        after = db.execute("select * from test_char_to_text")[0][0]
565
        after = unicode(after) # Oracle text fields return a sort of lazy string -- force evaluation
566
        self.assertEqual(value, after, "Change from char to text altered value [ %s != %s ]" % (`value`,`after`))
567

    
568
    def test_datetime_default(self):
569
        """
570
        Test that defaults are created correctly for datetime columns
571
        """
572
        end_of_world = datetime.datetime(2012, 12, 21, 0, 0, 1)
573

    
574
        try:
575
            from django.utils import timezone
576
        except ImportError:
577
            pass
578
        else:
579
            from django.conf import settings
580
            if getattr(settings, 'USE_TZ', False):
581
                end_of_world = end_of_world.replace(tzinfo=timezone.utc)
582

    
583
        db.create_table("test_datetime_def", [
584
            ('col0', models.IntegerField(null=True)),
585
            ('col1', models.DateTimeField(default=end_of_world)),
586
            ('col2', models.DateTimeField(null=True)),
587
        ])
588
        db.execute_deferred_sql()
589
        db.alter_column("test_datetime_def", "col2", models.DateTimeField(default=end_of_world))
590
        db.add_column("test_datetime_def", "col3", models.DateTimeField(default=end_of_world))
591
        db.execute_deferred_sql()
592
        # There should not be a default in the database for col1
593
        db.commit_transaction()
594
        db.start_transaction()
595
        self.assertRaises(
596
            IntegrityError,
597
            db.execute, "insert into test_datetime_def (col0) values (null)"
598
        )
599
        db.rollback_transaction()
600
        db.start_transaction()
601
        # There should be for the others
602
        db.execute("insert into test_datetime_def (col0, col1) values (null, %s)", [end_of_world])
603
        ends = db.execute("select col1,col2,col3 from test_datetime_def")[0]
604
        self.failUnlessEqual(len(ends), 3)
605
        for e in ends:
606
            self.failUnlessEqual(e, end_of_world)
607
        
608
    def test_add_unique_fk(self):
609
        """
610
        Test adding a ForeignKey with unique=True or a OneToOneField
611
        """
612
        db.create_table("test_add_unique_fk", [
613
            ('spam', models.BooleanField(default=False))
614
        ])
615
        
616
        db.add_column("test_add_unique_fk", "mock1", models.ForeignKey(db.mock_model('Mock', 'mock'), null=True, unique=True))
617
        db.add_column("test_add_unique_fk", "mock2", models.OneToOneField(db.mock_model('Mock', 'mock'), null=True))
618
        
619
        db.delete_table("test_add_unique_fk")
620
        
621
    def test_column_constraint(self):
622
        """
623
        Tests that the value constraint of PositiveIntegerField is enforced on
624
        the database level.
625
        """
626
        if not db.has_check_constraints:
627
            return
628
        
629
        db.create_table("test_column_constraint", [
630
            ('spam', models.PositiveIntegerField()),
631
        ])
632
        db.execute_deferred_sql()
633
        
634
        # Make sure we can't insert negative values
635
        db.commit_transaction()
636
        db.start_transaction()
637
        try:
638
            db.execute("INSERT INTO test_column_constraint VALUES (-42)")
639
        except:
640
            pass
641
        else:
642
            self.fail("Could insert a negative value into a PositiveIntegerField.")
643
        db.rollback_transaction()
644
        
645
        # remove constraint
646
        db.alter_column("test_column_constraint", "spam", models.IntegerField())
647
        db.execute_deferred_sql()
648
        # make sure the insertion works now
649
        db.execute('INSERT INTO test_column_constraint VALUES (-42)')
650
        db.execute('DELETE FROM test_column_constraint')
651
        
652
        # add it back again
653
        db.alter_column("test_column_constraint", "spam", models.PositiveIntegerField())
654
        db.execute_deferred_sql()
655
        # it should fail again
656
        db.start_transaction()
657
        try:
658
            db.execute("INSERT INTO test_column_constraint VALUES (-42)")
659
        except:
660
            pass
661
        else:
662
            self.fail("Could insert a negative value after changing an IntegerField to a PositiveIntegerField.")
663
        db.rollback_transaction()
664
        
665
        db.delete_table("test_column_constraint")
666
        db.start_transaction()
667

    
668
    def test_sql_defaults(self):
669
        """
670
        Test that sql default value is correct for non-string field types.
671
        Datetimes are handled in test_datetime_default.
672
        """
673

    
674
        class CustomField(models.CharField):
675
            __metaclass__ = models.SubfieldBase
676
            description = 'CustomField'
677
            def get_default(self):
678
                if self.has_default():
679
                    if callable(self.default):
680
                        return self.default()
681
                    return self.default
682
                return super(CustomField, self).get_default()
683
            def get_prep_value(self, value):
684
                if not value:
685
                    return value
686
                return ','.join(map(str, value))
687
            def to_python(self, value):
688
                if not value or isinstance(value, list):
689
                    return value
690
                return map(int, value.split(','))
691

    
692
        false_value = db.has_booleans and 'False' or '0'
693
        defaults = (
694
            (models.CharField(default='sukasuka'), 'DEFAULT \'sukasuka'),
695
            (models.BooleanField(default=False), 'DEFAULT %s' % false_value),
696
            (models.IntegerField(default=42), 'DEFAULT 42'),
697
            (CustomField(default=[2012, 2018, 2021, 2036]), 'DEFAULT \'2012,2018,2021,2036')
698
        )
699
        for field, sql_test_str in defaults:
700
            sql = db.column_sql('fish', 'YAAAAAAZ', field)
701
            if sql_test_str not in sql:
702
                self.fail("default sql value was not properly generated for field %r.\nSql was %s" % (field, sql))
703

    
704
    def test_make_added_foreign_key_not_null(self):
705
        # Table for FK to target
706
        User = db.mock_model(model_name='User', db_table='auth_user', db_tablespace='', pk_field_name='id', pk_field_type=models.AutoField, pk_field_args=[], pk_field_kwargs={})
707
        # Table with no foreign key
708
        db.create_table("test_fk", [
709
            ('eggs', models.IntegerField()),
710
        ])
711
        db.execute_deferred_sql()
712
        
713
        # Add foreign key
714
        db.add_column("test_fk", 'foreik', models.ForeignKey(User, null=True),
715
                      keep_default = False)
716
        db.execute_deferred_sql()
717
        
718
        # Make the FK null
719
        db.alter_column("test_fk", "foreik_id", models.ForeignKey(User))
720
        db.execute_deferred_sql()
721

    
722
class TestCacheGeneric(unittest.TestCase):
723
    base_ops_cls = generic.DatabaseOperations
724
    def setUp(self):
725
        class CacheOps(self.base_ops_cls):
726
            def __init__(self):
727
                self._constraint_cache = {}
728
                self.cache_filled = 0
729
                self.settings = {'NAME': 'db'}
730

    
731
            def _fill_constraint_cache(self, db, table):
732
                self.cache_filled += 1
733
                self._constraint_cache.setdefault(db, {})
734
                self._constraint_cache[db].setdefault(table, {})
735

    
736
            @generic.invalidate_table_constraints
737
            def clear_con(self, table):
738
                pass
739

    
740
            @generic.copy_column_constraints
741
            def cp_column(self, table, column_old, column_new):
742
                pass
743

    
744
            @generic.delete_column_constraints
745
            def rm_column(self, table, column):
746
                pass
747

    
748
            @generic.copy_column_constraints
749
            @generic.delete_column_constraints
750
            def mv_column(self, table, column_old, column_new):
751
                pass
752

    
753
            def _get_setting(self, attr):
754
                return self.settings[attr]
755
        self.CacheOps = CacheOps
756

    
757
    def test_cache(self):
758
        ops = self.CacheOps()
759
        self.assertEqual(0, ops.cache_filled)
760
        self.assertFalse(ops.lookup_constraint('db', 'table'))
761
        self.assertEqual(1, ops.cache_filled)
762
        self.assertFalse(ops.lookup_constraint('db', 'table'))
763
        self.assertEqual(1, ops.cache_filled)
764
        ops.clear_con('table')
765
        self.assertEqual(1, ops.cache_filled)
766
        self.assertFalse(ops.lookup_constraint('db', 'table'))
767
        self.assertEqual(2, ops.cache_filled)
768
        self.assertFalse(ops.lookup_constraint('db', 'table', 'column'))
769
        self.assertEqual(2, ops.cache_filled)
770

    
771
        cache = ops._constraint_cache
772
        cache['db']['table']['column'] = 'constraint'
773
        self.assertEqual('constraint', ops.lookup_constraint('db', 'table', 'column'))
774
        self.assertEqual([('column', 'constraint')], ops.lookup_constraint('db', 'table'))
775
        self.assertEqual(2, ops.cache_filled)
776

    
777
        # invalidate_table_constraints
778
        ops.clear_con('new_table')
779
        self.assertEqual('constraint', ops.lookup_constraint('db', 'table', 'column'))
780
        self.assertEqual(2, ops.cache_filled)
781

    
782
        self.assertFalse(ops.lookup_constraint('db', 'new_table'))
783
        self.assertEqual(3, ops.cache_filled)
784

    
785
        # delete_column_constraints
786
        cache['db']['table']['column'] = 'constraint'
787
        self.assertEqual('constraint', ops.lookup_constraint('db', 'table', 'column'))
788
        ops.rm_column('table', 'column')
789
        self.assertEqual([], ops.lookup_constraint('db', 'table', 'column'))
790
        self.assertEqual([], ops.lookup_constraint('db', 'table', 'noexist_column'))
791

    
792
        # copy_column_constraints
793
        cache['db']['table']['column'] = 'constraint'
794
        self.assertEqual('constraint', ops.lookup_constraint('db', 'table', 'column'))
795
        ops.cp_column('table', 'column', 'column_new')
796
        self.assertEqual('constraint', ops.lookup_constraint('db', 'table', 'column_new'))
797
        self.assertEqual('constraint', ops.lookup_constraint('db', 'table', 'column'))
798

    
799
        # copy + delete
800
        cache['db']['table']['column'] = 'constraint'
801
        self.assertEqual('constraint', ops.lookup_constraint('db', 'table', 'column'))
802
        ops.mv_column('table', 'column', 'column_new')
803
        self.assertEqual('constraint', ops.lookup_constraint('db', 'table', 'column_new'))
804
        self.assertEqual([], ops.lookup_constraint('db', 'table', 'column'))
805
        return
806

    
807
    def test_valid(self):
808
        ops = self.CacheOps()
809
        # none of these should vivify a table into a valid state
810
        self.assertFalse(ops._is_valid_cache('db', 'table'))
811
        self.assertFalse(ops._is_valid_cache('db', 'table'))
812
        ops.clear_con('table')
813
        self.assertFalse(ops._is_valid_cache('db', 'table'))
814
        ops.rm_column('table', 'column')
815
        self.assertFalse(ops._is_valid_cache('db', 'table'))
816

    
817
        # these should change the cache state
818
        ops.lookup_constraint('db', 'table')
819
        self.assertTrue(ops._is_valid_cache('db', 'table'))
820
        ops.lookup_constraint('db', 'table', 'column')
821
        self.assertTrue(ops._is_valid_cache('db', 'table'))
822
        ops.clear_con('table')
823
        self.assertFalse(ops._is_valid_cache('db', 'table'))
824

    
825
    def test_valid_implementation(self):
826
        # generic fills the cache on a per-table basis
827
        ops = self.CacheOps()
828
        self.assertFalse(ops._is_valid_cache('db', 'table'))
829
        self.assertFalse(ops._is_valid_cache('db', 'other_table'))
830
        ops.lookup_constraint('db', 'table')
831
        self.assertTrue(ops._is_valid_cache('db', 'table'))
832
        self.assertFalse(ops._is_valid_cache('db', 'other_table'))
833
        ops.lookup_constraint('db', 'other_table')
834
        self.assertTrue(ops._is_valid_cache('db', 'table'))
835
        self.assertTrue(ops._is_valid_cache('db', 'other_table'))
836
        ops.clear_con('table')
837
        self.assertFalse(ops._is_valid_cache('db', 'table'))
838
        self.assertTrue(ops._is_valid_cache('db', 'other_table'))
839

    
840
if mysql:
841
    class TestCacheMysql(TestCacheGeneric):
842
        base_ops_cls = mysql.DatabaseOperations
843

    
844
        def test_valid_implementation(self):
845
            # mysql fills the cache on a per-db basis
846
            ops = self.CacheOps()
847
            self.assertFalse(ops._is_valid_cache('db', 'table'))
848
            self.assertFalse(ops._is_valid_cache('db', 'other_table'))
849
            ops.lookup_constraint('db', 'table')
850
            self.assertTrue(ops._is_valid_cache('db', 'table'))
851
            self.assertTrue(ops._is_valid_cache('db', 'other_table'))
852
            ops.lookup_constraint('db', 'other_table')
853
            self.assertTrue(ops._is_valid_cache('db', 'table'))
854
            self.assertTrue(ops._is_valid_cache('db', 'other_table'))
855
            ops.clear_con('table')
856
            self.assertFalse(ops._is_valid_cache('db', 'table'))
857
            self.assertTrue(ops._is_valid_cache('db', 'other_table'))