root / env / lib / python2.7 / site-packages / south / tests / db.py @ d1a4905f
History | View | Annotate | Download (33.7 KB)
1 |
import datetime |
---|---|
2 |
|
3 |
from south.db import db, generic |
4 |
from django.db import connection, models, IntegrityError |
5 |
|
6 |
from south.tests import unittest, skipUnless |
7 |
|
8 |
# Create a list of error classes from the various database libraries
|
9 |
errors = [] |
10 |
try:
|
11 |
from psycopg2 import ProgrammingError |
12 |
errors.append(ProgrammingError) |
13 |
except ImportError: |
14 |
pass
|
15 |
errors = tuple(errors)
|
16 |
|
17 |
try:
|
18 |
from south.db import mysql |
19 |
except ImportError: |
20 |
mysql = None
|
21 |
|
22 |
|
23 |
class TestOperations(unittest.TestCase): |
24 |
|
25 |
"""
|
26 |
Tests if the various DB abstraction calls work.
|
27 |
Can only test a limited amount due to DB differences.
|
28 |
"""
|
29 |
|
30 |
def setUp(self): |
31 |
db.debug = False
|
32 |
db.clear_deferred_sql() |
33 |
db.start_transaction() |
34 |
|
35 |
def tearDown(self): |
36 |
db.rollback_transaction() |
37 |
|
38 |
def test_create(self): |
39 |
"""
|
40 |
Test creation of tables.
|
41 |
"""
|
42 |
cursor = connection.cursor() |
43 |
# It needs to take at least 2 args
|
44 |
self.assertRaises(TypeError, db.create_table) |
45 |
self.assertRaises(TypeError, db.create_table, "test1") |
46 |
# Empty tables (i.e. no columns) are not fine, so make at least 1
|
47 |
db.create_table("test1", [('email_confirmed', models.BooleanField(default=False))]) |
48 |
# And should exist
|
49 |
cursor.execute("SELECT * FROM test1")
|
50 |
# Make sure we can't do the same query on an empty table
|
51 |
try:
|
52 |
cursor.execute("SELECT * FROM nottheretest1")
|
53 |
except:
|
54 |
pass
|
55 |
else:
|
56 |
self.fail("Non-existent table could be selected!") |
57 |
|
58 |
def test_delete(self): |
59 |
"""
|
60 |
Test deletion of tables.
|
61 |
"""
|
62 |
cursor = connection.cursor() |
63 |
db.create_table("test_deltable", [('email_confirmed', models.BooleanField(default=False))]) |
64 |
db.delete_table("test_deltable")
|
65 |
# Make sure it went
|
66 |
try:
|
67 |
cursor.execute("SELECT * FROM test_deltable")
|
68 |
except:
|
69 |
pass
|
70 |
else:
|
71 |
self.fail("Just-deleted table could be selected!") |
72 |
|
73 |
def test_nonexistent_delete(self): |
74 |
"""
|
75 |
Test deletion of nonexistent tables.
|
76 |
"""
|
77 |
try:
|
78 |
db.delete_table("test_nonexistdeltable")
|
79 |
except:
|
80 |
pass
|
81 |
else:
|
82 |
self.fail("Non-existent table could be deleted!") |
83 |
|
84 |
def test_foreign_keys(self): |
85 |
"""
|
86 |
Tests foreign key creation, especially uppercase (see #61)
|
87 |
"""
|
88 |
Test = db.mock_model(model_name='Test', db_table='test5a', |
89 |
db_tablespace='', pk_field_name='ID', |
90 |
pk_field_type=models.AutoField, pk_field_args=[]) |
91 |
db.create_table("test5a", [('ID', models.AutoField(verbose_name='ID', primary_key=True, auto_created=True))]) |
92 |
db.create_table("test5b", [
|
93 |
('id', models.AutoField(verbose_name='ID', primary_key=True, auto_created=True)), |
94 |
('UNIQUE', models.ForeignKey(Test)),
|
95 |
]) |
96 |
db.execute_deferred_sql() |
97 |
|
98 |
@skipUnless(db.supports_foreign_keys, 'Foreign keys can only be deleted on ' |
99 |
'engines that support them.')
|
100 |
def test_recursive_foreign_key_delete(self): |
101 |
"""
|
102 |
Test that recursive foreign keys are deleted correctly (see #1065)
|
103 |
"""
|
104 |
Test = db.mock_model(model_name='Test', db_table='test_rec_fk_del', |
105 |
db_tablespace='', pk_field_name='id', |
106 |
pk_field_type=models.AutoField, pk_field_args=[]) |
107 |
db.create_table('test_rec_fk_del', [
|
108 |
('id', models.AutoField(primary_key=True, auto_created=True)), |
109 |
('fk', models.ForeignKey(Test)),
|
110 |
]) |
111 |
db.execute_deferred_sql() |
112 |
db.delete_foreign_key('test_rec_fk_del', 'fk_id') |
113 |
|
114 |
def test_rename(self): |
115 |
"""
|
116 |
Test column renaming
|
117 |
"""
|
118 |
cursor = connection.cursor() |
119 |
db.create_table("test_rn", [('spam', models.BooleanField(default=False))]) |
120 |
# Make sure we can select the column
|
121 |
cursor.execute("SELECT spam FROM test_rn")
|
122 |
# Rename it
|
123 |
db.rename_column("test_rn", "spam", "eggs") |
124 |
cursor.execute("SELECT eggs FROM test_rn")
|
125 |
db.commit_transaction() |
126 |
db.start_transaction() |
127 |
try:
|
128 |
cursor.execute("SELECT spam FROM test_rn")
|
129 |
except:
|
130 |
pass
|
131 |
else:
|
132 |
self.fail("Just-renamed column could be selected!") |
133 |
db.rollback_transaction() |
134 |
db.delete_table("test_rn")
|
135 |
db.start_transaction() |
136 |
|
137 |
def test_dry_rename(self): |
138 |
"""
|
139 |
Test column renaming while --dry-run is turned on (should do nothing)
|
140 |
See ticket #65
|
141 |
"""
|
142 |
cursor = connection.cursor() |
143 |
db.create_table("test_drn", [('spam', models.BooleanField(default=False))]) |
144 |
# Make sure we can select the column
|
145 |
cursor.execute("SELECT spam FROM test_drn")
|
146 |
# Rename it
|
147 |
db.dry_run = True
|
148 |
db.rename_column("test_drn", "spam", "eggs") |
149 |
db.dry_run = False
|
150 |
cursor.execute("SELECT spam FROM test_drn")
|
151 |
db.commit_transaction() |
152 |
db.start_transaction() |
153 |
try:
|
154 |
cursor.execute("SELECT eggs FROM test_drn")
|
155 |
except:
|
156 |
pass
|
157 |
else:
|
158 |
self.fail("Dry-renamed new column could be selected!") |
159 |
db.rollback_transaction() |
160 |
db.delete_table("test_drn")
|
161 |
db.start_transaction() |
162 |
|
163 |
def test_table_rename(self): |
164 |
"""
|
165 |
Test column renaming
|
166 |
"""
|
167 |
cursor = connection.cursor() |
168 |
db.create_table("testtr", [('spam', models.BooleanField(default=False))]) |
169 |
# Make sure we can select the column
|
170 |
cursor.execute("SELECT spam FROM testtr")
|
171 |
# Rename it
|
172 |
db.rename_table("testtr", "testtr2") |
173 |
cursor.execute("SELECT spam FROM testtr2")
|
174 |
db.commit_transaction() |
175 |
db.start_transaction() |
176 |
try:
|
177 |
cursor.execute("SELECT spam FROM testtr")
|
178 |
except:
|
179 |
pass
|
180 |
else:
|
181 |
self.fail("Just-renamed column could be selected!") |
182 |
db.rollback_transaction() |
183 |
db.delete_table("testtr2")
|
184 |
db.start_transaction() |
185 |
|
186 |
def test_percents_in_defaults(self): |
187 |
"""
|
188 |
Test that % in a default gets escaped to %%.
|
189 |
"""
|
190 |
try:
|
191 |
db.create_table("testpind", [('cf', models.CharField(max_length=255, default="It should be 2%!"))]) |
192 |
except IndexError: |
193 |
self.fail("% was not properly escaped in column SQL.") |
194 |
db.delete_table("testpind")
|
195 |
|
196 |
def test_index(self): |
197 |
"""
|
198 |
Test the index operations
|
199 |
"""
|
200 |
db.create_table("test3", [
|
201 |
('SELECT', models.BooleanField(default=False)), |
202 |
('eggs', models.IntegerField(unique=True)), |
203 |
]) |
204 |
db.execute_deferred_sql() |
205 |
# Add an index on that column
|
206 |
db.create_index("test3", ["SELECT"]) |
207 |
# Add another index on two columns
|
208 |
db.create_index("test3", ["SELECT", "eggs"]) |
209 |
# Delete them both
|
210 |
db.delete_index("test3", ["SELECT"]) |
211 |
db.delete_index("test3", ["SELECT", "eggs"]) |
212 |
# Delete the unique index/constraint
|
213 |
if db.backend_name != "sqlite3": |
214 |
db.delete_unique("test3", ["eggs"]) |
215 |
db.delete_table("test3")
|
216 |
|
217 |
def test_primary_key(self): |
218 |
"""
|
219 |
Test the primary key operations
|
220 |
"""
|
221 |
|
222 |
db.create_table("test_pk", [
|
223 |
('id', models.IntegerField(primary_key=True)), |
224 |
('new_pkey', models.IntegerField()),
|
225 |
('eggs', models.IntegerField(unique=True)), |
226 |
]) |
227 |
db.execute_deferred_sql() |
228 |
# Remove the default primary key, and make eggs it
|
229 |
db.delete_primary_key("test_pk")
|
230 |
db.create_primary_key("test_pk", "new_pkey") |
231 |
# Try inserting a now-valid row pair
|
232 |
db.execute("INSERT INTO test_pk (id, new_pkey, eggs) VALUES (1, 2, 3)")
|
233 |
db.execute("INSERT INTO test_pk (id, new_pkey, eggs) VALUES (1, 3, 4)")
|
234 |
db.delete_table("test_pk")
|
235 |
|
236 |
def test_primary_key_implicit(self): |
237 |
"""
|
238 |
Tests that changing primary key implicitly fails.
|
239 |
"""
|
240 |
db.create_table("test_pki", [
|
241 |
('id', models.IntegerField(primary_key=True)), |
242 |
('new_pkey', models.IntegerField()),
|
243 |
('eggs', models.IntegerField(unique=True)), |
244 |
]) |
245 |
db.execute_deferred_sql() |
246 |
# Fiddle with alter_column to attempt to make it remove the primary key
|
247 |
db.alter_column("test_pki", "id", models.IntegerField()) |
248 |
db.alter_column("test_pki", "new_pkey", models.IntegerField(primary_key=True)) |
249 |
# Try inserting a should-be-valid row pair
|
250 |
db.execute("INSERT INTO test_pki (id, new_pkey, eggs) VALUES (1, 2, 3)")
|
251 |
db.execute("INSERT INTO test_pki (id, new_pkey, eggs) VALUES (2, 2, 4)")
|
252 |
db.delete_table("test_pki")
|
253 |
|
254 |
def test_add_columns(self): |
255 |
"""
|
256 |
Test adding columns
|
257 |
"""
|
258 |
db.create_table("test_addc", [
|
259 |
('spam', models.BooleanField(default=False)), |
260 |
('eggs', models.IntegerField()),
|
261 |
]) |
262 |
# Add a column
|
263 |
db.add_column("test_addc", "add1", models.IntegerField(default=3), keep_default=False) |
264 |
# Add a FK with keep_default=False (#69)
|
265 |
User = db.mock_model(model_name='User', db_table='auth_user', db_tablespace='', pk_field_name='id', pk_field_type=models.AutoField, pk_field_args=[], pk_field_kwargs={}) |
266 |
# insert some data so we can test the default value of the added fkey
|
267 |
db.execute("INSERT INTO test_addc (spam, eggs, add1) VALUES (%s, 1, 2)", [False]) |
268 |
db.add_column("test_addc", "user", models.ForeignKey(User, null=True), keep_default=False) |
269 |
db.execute_deferred_sql() |
270 |
# try selecting from the user_id column to make sure it was actually created
|
271 |
val = db.execute("SELECT user_id FROM test_addc")[0][0] |
272 |
self.assertEquals(val, None) |
273 |
db.delete_column("test_addc", "add1") |
274 |
db.delete_table("test_addc")
|
275 |
|
276 |
def test_add_nullbool_column(self): |
277 |
"""
|
278 |
Test adding NullBoolean columns
|
279 |
"""
|
280 |
db.create_table("test_addnbc", [
|
281 |
('spam', models.BooleanField(default=False)), |
282 |
('eggs', models.IntegerField()),
|
283 |
]) |
284 |
# Add a column
|
285 |
db.add_column("test_addnbc", "add1", models.NullBooleanField()) |
286 |
# Add a column with a default
|
287 |
db.add_column("test_addnbc", "add2", models.NullBooleanField(default=True)) |
288 |
# insert some data so we can test the default values of the added column
|
289 |
db.execute("INSERT INTO test_addnbc (spam, eggs) VALUES (%s, 1)", [False]) |
290 |
# try selecting from the new columns to make sure they were properly created
|
291 |
false, null, true = db.execute("SELECT spam,add1,add2 FROM test_addnbc")[0][0:3] |
292 |
self.assertTrue(true)
|
293 |
self.assertEquals(null, None) |
294 |
self.assertEquals(false, False) |
295 |
db.delete_table("test_addnbc")
|
296 |
|
297 |
def test_alter_columns(self): |
298 |
"""
|
299 |
Test altering columns
|
300 |
"""
|
301 |
db.create_table("test_alterc", [
|
302 |
('spam', models.BooleanField(default=False)), |
303 |
('eggs', models.IntegerField()),
|
304 |
]) |
305 |
db.execute_deferred_sql() |
306 |
# Change eggs to be a FloatField
|
307 |
db.alter_column("test_alterc", "eggs", models.FloatField()) |
308 |
db.execute_deferred_sql() |
309 |
db.delete_table("test_alterc")
|
310 |
db.execute_deferred_sql() |
311 |
|
312 |
def test_alter_char_default(self): |
313 |
"""
|
314 |
Test altering column defaults with char fields
|
315 |
"""
|
316 |
db.create_table("test_altercd", [
|
317 |
('spam', models.CharField(max_length=30)), |
318 |
('eggs', models.IntegerField()),
|
319 |
]) |
320 |
# Change spam default
|
321 |
db.alter_column("test_altercd", "spam", models.CharField(max_length=30, default="loof")) |
322 |
|
323 |
def test_mysql_defaults(self): |
324 |
"""
|
325 |
Test MySQL default handling for BLOB and TEXT.
|
326 |
"""
|
327 |
db.create_table("test_altermyd", [
|
328 |
('spam', models.BooleanField(default=False)), |
329 |
('eggs', models.TextField()),
|
330 |
]) |
331 |
# Change eggs to be a FloatField
|
332 |
db.alter_column("test_altermyd", "eggs", models.TextField(null=True)) |
333 |
db.delete_table("test_altermyd")
|
334 |
|
335 |
def test_alter_column_postgres_multiword(self): |
336 |
"""
|
337 |
Tests altering columns with multiple words in Postgres types (issue #125)
|
338 |
e.g. 'datetime with time zone', look at django/db/backends/postgresql/creation.py
|
339 |
"""
|
340 |
db.create_table("test_multiword", [
|
341 |
('col_datetime', models.DateTimeField(null=True)), |
342 |
('col_integer', models.PositiveIntegerField(null=True)), |
343 |
('col_smallint', models.PositiveSmallIntegerField(null=True)), |
344 |
('col_float', models.FloatField(null=True)), |
345 |
]) |
346 |
|
347 |
# test if 'double precision' is preserved
|
348 |
db.alter_column('test_multiword', 'col_float', models.FloatField('float', null=True)) |
349 |
|
350 |
# test if 'CHECK ("%(column)s" >= 0)' is stripped
|
351 |
db.alter_column('test_multiword', 'col_integer', models.PositiveIntegerField(null=True)) |
352 |
db.alter_column('test_multiword', 'col_smallint', models.PositiveSmallIntegerField(null=True)) |
353 |
|
354 |
# test if 'with timezone' is preserved
|
355 |
if db.backend_name == "postgres": |
356 |
db.execute("INSERT INTO test_multiword (col_datetime) VALUES ('2009-04-24 14:20:55+02')")
|
357 |
db.alter_column('test_multiword', 'col_datetime', models.DateTimeField(auto_now=True)) |
358 |
assert db.execute("SELECT col_datetime = '2009-04-24 14:20:55+02' FROM test_multiword")[0][0] |
359 |
|
360 |
db.delete_table("test_multiword")
|
361 |
|
362 |
def test_alter_constraints(self): |
363 |
"""
|
364 |
Tests that going from a PostiveIntegerField to an IntegerField drops
|
365 |
the constraint on the database.
|
366 |
"""
|
367 |
# Only applies to databases that support CHECK constraints
|
368 |
if not db.has_check_constraints: |
369 |
return
|
370 |
# Make the test table
|
371 |
db.create_table("test_alterc", [
|
372 |
('num', models.PositiveIntegerField()),
|
373 |
]) |
374 |
db.execute_deferred_sql() |
375 |
# Add in some test values
|
376 |
db.execute("INSERT INTO test_alterc (num) VALUES (1)")
|
377 |
db.execute("INSERT INTO test_alterc (num) VALUES (2)")
|
378 |
# Ensure that adding a negative number is bad
|
379 |
db.commit_transaction() |
380 |
db.start_transaction() |
381 |
try:
|
382 |
db.execute("INSERT INTO test_alterc (num) VALUES (-3)")
|
383 |
except:
|
384 |
db.rollback_transaction() |
385 |
else:
|
386 |
self.fail("Could insert a negative integer into a PositiveIntegerField.") |
387 |
# Alter it to a normal IntegerField
|
388 |
db.alter_column("test_alterc", "num", models.IntegerField()) |
389 |
db.execute_deferred_sql() |
390 |
# It should now work
|
391 |
db.execute("INSERT INTO test_alterc (num) VALUES (-3)")
|
392 |
db.delete_table("test_alterc")
|
393 |
# We need to match up for tearDown
|
394 |
db.start_transaction() |
395 |
|
396 |
def test_unique(self): |
397 |
"""
|
398 |
Tests creating/deleting unique constraints.
|
399 |
"""
|
400 |
|
401 |
# SQLite backend doesn't support this yet.
|
402 |
if db.backend_name == "sqlite3": |
403 |
return
|
404 |
|
405 |
db.create_table("test_unique2", [
|
406 |
('id', models.AutoField(primary_key=True)), |
407 |
]) |
408 |
db.create_table("test_unique", [
|
409 |
('spam', models.BooleanField(default=False)), |
410 |
('eggs', models.IntegerField()),
|
411 |
('ham', models.ForeignKey(db.mock_model('Unique2', 'test_unique2'))), |
412 |
]) |
413 |
db.execute_deferred_sql() |
414 |
# Add a constraint
|
415 |
db.create_unique("test_unique", ["spam"]) |
416 |
db.execute_deferred_sql() |
417 |
# Shouldn't do anything during dry-run
|
418 |
db.dry_run = True
|
419 |
db.delete_unique("test_unique", ["spam"]) |
420 |
db.dry_run = False
|
421 |
db.delete_unique("test_unique", ["spam"]) |
422 |
db.create_unique("test_unique", ["spam"]) |
423 |
# Special preparations for Sql Server
|
424 |
if db.backend_name == "pyodbc": |
425 |
db.execute("SET IDENTITY_INSERT test_unique2 ON;")
|
426 |
db.execute("INSERT INTO test_unique2 (id) VALUES (1)")
|
427 |
db.execute("INSERT INTO test_unique2 (id) VALUES (2)")
|
428 |
db.commit_transaction() |
429 |
db.start_transaction() |
430 |
|
431 |
|
432 |
# Test it works
|
433 |
TRUE = (True,)
|
434 |
FALSE = (False,)
|
435 |
db.execute("INSERT INTO test_unique (spam, eggs, ham_id) VALUES (%s, 0, 1)", TRUE)
|
436 |
db.execute("INSERT INTO test_unique (spam, eggs, ham_id) VALUES (%s, 1, 2)", FALSE)
|
437 |
try:
|
438 |
db.execute("INSERT INTO test_unique (spam, eggs, ham_id) VALUES (%s, 2, 1)", FALSE)
|
439 |
except:
|
440 |
db.rollback_transaction() |
441 |
else:
|
442 |
self.fail("Could insert non-unique item.") |
443 |
|
444 |
# Drop that, add one only on eggs
|
445 |
db.delete_unique("test_unique", ["spam"]) |
446 |
db.execute("DELETE FROM test_unique")
|
447 |
db.create_unique("test_unique", ["eggs"]) |
448 |
db.start_transaction() |
449 |
|
450 |
# Test similarly
|
451 |
db.execute("INSERT INTO test_unique (spam, eggs, ham_id) VALUES (%s, 0, 1)", TRUE)
|
452 |
db.execute("INSERT INTO test_unique (spam, eggs, ham_id) VALUES (%s, 1, 2)", FALSE)
|
453 |
try:
|
454 |
db.execute("INSERT INTO test_unique (spam, eggs, ham_id) VALUES (%s, 1, 1)", TRUE)
|
455 |
except:
|
456 |
db.rollback_transaction() |
457 |
else:
|
458 |
self.fail("Could insert non-unique item.") |
459 |
|
460 |
# Drop those, test combined constraints
|
461 |
db.delete_unique("test_unique", ["eggs"]) |
462 |
db.execute("DELETE FROM test_unique")
|
463 |
db.create_unique("test_unique", ["spam", "eggs", "ham_id"]) |
464 |
db.start_transaction() |
465 |
# Test similarly
|
466 |
db.execute("INSERT INTO test_unique (spam, eggs, ham_id) VALUES (%s, 0, 1)", TRUE)
|
467 |
db.execute("INSERT INTO test_unique (spam, eggs, ham_id) VALUES (%s, 1, 1)", FALSE)
|
468 |
try:
|
469 |
db.execute("INSERT INTO test_unique (spam, eggs, ham_id) VALUES (%s, 0, 1)", TRUE)
|
470 |
except:
|
471 |
db.rollback_transaction() |
472 |
else:
|
473 |
self.fail("Could insert non-unique pair.") |
474 |
db.delete_unique("test_unique", ["spam", "eggs", "ham_id"]) |
475 |
db.start_transaction() |
476 |
|
477 |
def test_alter_unique(self): |
478 |
"""
|
479 |
Tests that unique constraints are not affected when
|
480 |
altering columns (that's handled by create_/delete_unique)
|
481 |
"""
|
482 |
db.create_table("test_alter_unique", [
|
483 |
('spam', models.IntegerField()),
|
484 |
('eggs', models.IntegerField(unique=True)), |
485 |
]) |
486 |
db.execute_deferred_sql() |
487 |
|
488 |
# Make sure the unique constraint is created
|
489 |
db.execute('INSERT INTO test_alter_unique (spam, eggs) VALUES (0, 42)')
|
490 |
db.commit_transaction() |
491 |
db.start_transaction() |
492 |
try:
|
493 |
db.execute("INSERT INTO test_alter_unique (spam, eggs) VALUES (1, 42)")
|
494 |
except:
|
495 |
pass
|
496 |
else:
|
497 |
self.fail("Could insert the same integer twice into a unique field.") |
498 |
db.rollback_transaction() |
499 |
|
500 |
# Alter without unique=True (should not affect anything)
|
501 |
db.alter_column("test_alter_unique", "eggs", models.IntegerField()) |
502 |
|
503 |
# Insertion should still fail
|
504 |
db.start_transaction() |
505 |
try:
|
506 |
db.execute("INSERT INTO test_alter_unique (spam, eggs) VALUES (1, 42)")
|
507 |
except:
|
508 |
pass
|
509 |
else:
|
510 |
self.fail("Could insert the same integer twice into a unique field after alter_column with unique=False.") |
511 |
db.rollback_transaction() |
512 |
|
513 |
# Delete the unique index/constraint
|
514 |
if db.backend_name != "sqlite3": |
515 |
db.delete_unique("test_alter_unique", ["eggs"]) |
516 |
db.delete_table("test_alter_unique")
|
517 |
db.start_transaction() |
518 |
|
519 |
def test_capitalised_constraints(self): |
520 |
"""
|
521 |
Under PostgreSQL at least, capitalised constraints must be quoted.
|
522 |
"""
|
523 |
db.create_table("test_capconst", [
|
524 |
('SOMECOL', models.PositiveIntegerField(primary_key=True)), |
525 |
]) |
526 |
# Alter it so it's not got the check constraint
|
527 |
db.alter_column("test_capconst", "SOMECOL", models.IntegerField()) |
528 |
|
529 |
def test_text_default(self): |
530 |
"""
|
531 |
MySQL cannot have blank defaults on TEXT columns.
|
532 |
"""
|
533 |
db.create_table("test_textdef", [
|
534 |
('textcol', models.TextField(blank=True)), |
535 |
]) |
536 |
|
537 |
def test_text_to_char(self): |
538 |
"""
|
539 |
On Oracle, you can't simply ALTER TABLE MODIFY a textfield to a charfield
|
540 |
"""
|
541 |
value = "kawabanga"
|
542 |
db.create_table("test_text_to_char", [
|
543 |
('textcol', models.TextField()),
|
544 |
]) |
545 |
db.execute_deferred_sql() |
546 |
db.execute("INSERT INTO test_text_to_char VALUES (%s)", [value])
|
547 |
db.alter_column("test_text_to_char", "textcol", models.CharField(max_length=100)) |
548 |
db.execute_deferred_sql() |
549 |
after = db.execute("select * from test_text_to_char")[0][0] |
550 |
self.assertEqual(value, after, "Change from text to char altered value [ %s != %s ]" % (`value`,`after`)) |
551 |
|
552 |
def test_char_to_text(self): |
553 |
"""
|
554 |
On Oracle, you can't simply ALTER TABLE MODIFY a charfield to a textfield either
|
555 |
"""
|
556 |
value = "agnabawak"
|
557 |
db.create_table("test_char_to_text", [
|
558 |
('textcol', models.CharField(max_length=100)), |
559 |
]) |
560 |
db.execute_deferred_sql() |
561 |
db.execute("INSERT INTO test_char_to_text VALUES (%s)", [value])
|
562 |
db.alter_column("test_char_to_text", "textcol", models.TextField()) |
563 |
db.execute_deferred_sql() |
564 |
after = db.execute("select * from test_char_to_text")[0][0] |
565 |
after = unicode(after) # Oracle text fields return a sort of lazy string -- force evaluation |
566 |
self.assertEqual(value, after, "Change from char to text altered value [ %s != %s ]" % (`value`,`after`)) |
567 |
|
568 |
def test_datetime_default(self): |
569 |
"""
|
570 |
Test that defaults are created correctly for datetime columns
|
571 |
"""
|
572 |
end_of_world = datetime.datetime(2012, 12, 21, 0, 0, 1) |
573 |
|
574 |
try:
|
575 |
from django.utils import timezone |
576 |
except ImportError: |
577 |
pass
|
578 |
else:
|
579 |
from django.conf import settings |
580 |
if getattr(settings, 'USE_TZ', False): |
581 |
end_of_world = end_of_world.replace(tzinfo=timezone.utc) |
582 |
|
583 |
db.create_table("test_datetime_def", [
|
584 |
('col0', models.IntegerField(null=True)), |
585 |
('col1', models.DateTimeField(default=end_of_world)),
|
586 |
('col2', models.DateTimeField(null=True)), |
587 |
]) |
588 |
db.execute_deferred_sql() |
589 |
db.alter_column("test_datetime_def", "col2", models.DateTimeField(default=end_of_world)) |
590 |
db.add_column("test_datetime_def", "col3", models.DateTimeField(default=end_of_world)) |
591 |
db.execute_deferred_sql() |
592 |
# There should not be a default in the database for col1
|
593 |
db.commit_transaction() |
594 |
db.start_transaction() |
595 |
self.assertRaises(
|
596 |
IntegrityError, |
597 |
db.execute, "insert into test_datetime_def (col0) values (null)"
|
598 |
) |
599 |
db.rollback_transaction() |
600 |
db.start_transaction() |
601 |
# There should be for the others
|
602 |
db.execute("insert into test_datetime_def (col0, col1) values (null, %s)", [end_of_world])
|
603 |
ends = db.execute("select col1,col2,col3 from test_datetime_def")[0] |
604 |
self.failUnlessEqual(len(ends), 3) |
605 |
for e in ends: |
606 |
self.failUnlessEqual(e, end_of_world)
|
607 |
|
608 |
def test_add_unique_fk(self): |
609 |
"""
|
610 |
Test adding a ForeignKey with unique=True or a OneToOneField
|
611 |
"""
|
612 |
db.create_table("test_add_unique_fk", [
|
613 |
('spam', models.BooleanField(default=False)) |
614 |
]) |
615 |
|
616 |
db.add_column("test_add_unique_fk", "mock1", models.ForeignKey(db.mock_model('Mock', 'mock'), null=True, unique=True)) |
617 |
db.add_column("test_add_unique_fk", "mock2", models.OneToOneField(db.mock_model('Mock', 'mock'), null=True)) |
618 |
|
619 |
db.delete_table("test_add_unique_fk")
|
620 |
|
621 |
def test_column_constraint(self): |
622 |
"""
|
623 |
Tests that the value constraint of PositiveIntegerField is enforced on
|
624 |
the database level.
|
625 |
"""
|
626 |
if not db.has_check_constraints: |
627 |
return
|
628 |
|
629 |
db.create_table("test_column_constraint", [
|
630 |
('spam', models.PositiveIntegerField()),
|
631 |
]) |
632 |
db.execute_deferred_sql() |
633 |
|
634 |
# Make sure we can't insert negative values
|
635 |
db.commit_transaction() |
636 |
db.start_transaction() |
637 |
try:
|
638 |
db.execute("INSERT INTO test_column_constraint VALUES (-42)")
|
639 |
except:
|
640 |
pass
|
641 |
else:
|
642 |
self.fail("Could insert a negative value into a PositiveIntegerField.") |
643 |
db.rollback_transaction() |
644 |
|
645 |
# remove constraint
|
646 |
db.alter_column("test_column_constraint", "spam", models.IntegerField()) |
647 |
db.execute_deferred_sql() |
648 |
# make sure the insertion works now
|
649 |
db.execute('INSERT INTO test_column_constraint VALUES (-42)')
|
650 |
db.execute('DELETE FROM test_column_constraint')
|
651 |
|
652 |
# add it back again
|
653 |
db.alter_column("test_column_constraint", "spam", models.PositiveIntegerField()) |
654 |
db.execute_deferred_sql() |
655 |
# it should fail again
|
656 |
db.start_transaction() |
657 |
try:
|
658 |
db.execute("INSERT INTO test_column_constraint VALUES (-42)")
|
659 |
except:
|
660 |
pass
|
661 |
else:
|
662 |
self.fail("Could insert a negative value after changing an IntegerField to a PositiveIntegerField.") |
663 |
db.rollback_transaction() |
664 |
|
665 |
db.delete_table("test_column_constraint")
|
666 |
db.start_transaction() |
667 |
|
668 |
def test_sql_defaults(self): |
669 |
"""
|
670 |
Test that sql default value is correct for non-string field types.
|
671 |
Datetimes are handled in test_datetime_default.
|
672 |
"""
|
673 |
|
674 |
class CustomField(models.CharField): |
675 |
__metaclass__ = models.SubfieldBase |
676 |
description = 'CustomField'
|
677 |
def get_default(self): |
678 |
if self.has_default(): |
679 |
if callable(self.default): |
680 |
return self.default() |
681 |
return self.default |
682 |
return super(CustomField, self).get_default() |
683 |
def get_prep_value(self, value): |
684 |
if not value: |
685 |
return value
|
686 |
return ','.join(map(str, value)) |
687 |
def to_python(self, value): |
688 |
if not value or isinstance(value, list): |
689 |
return value
|
690 |
return map(int, value.split(',')) |
691 |
|
692 |
false_value = db.has_booleans and 'False' or '0' |
693 |
defaults = ( |
694 |
(models.CharField(default='sukasuka'), 'DEFAULT \'sukasuka'), |
695 |
(models.BooleanField(default=False), 'DEFAULT %s' % false_value), |
696 |
(models.IntegerField(default=42), 'DEFAULT 42'), |
697 |
(CustomField(default=[2012, 2018, 2021, 2036]), 'DEFAULT \'2012,2018,2021,2036') |
698 |
) |
699 |
for field, sql_test_str in defaults: |
700 |
sql = db.column_sql('fish', 'YAAAAAAZ', field) |
701 |
if sql_test_str not in sql: |
702 |
self.fail("default sql value was not properly generated for field %r.\nSql was %s" % (field, sql)) |
703 |
|
704 |
def test_make_added_foreign_key_not_null(self): |
705 |
# Table for FK to target
|
706 |
User = db.mock_model(model_name='User', db_table='auth_user', db_tablespace='', pk_field_name='id', pk_field_type=models.AutoField, pk_field_args=[], pk_field_kwargs={}) |
707 |
# Table with no foreign key
|
708 |
db.create_table("test_fk", [
|
709 |
('eggs', models.IntegerField()),
|
710 |
]) |
711 |
db.execute_deferred_sql() |
712 |
|
713 |
# Add foreign key
|
714 |
db.add_column("test_fk", 'foreik', models.ForeignKey(User, null=True), |
715 |
keep_default = False)
|
716 |
db.execute_deferred_sql() |
717 |
|
718 |
# Make the FK null
|
719 |
db.alter_column("test_fk", "foreik_id", models.ForeignKey(User)) |
720 |
db.execute_deferred_sql() |
721 |
|
722 |
class TestCacheGeneric(unittest.TestCase): |
723 |
base_ops_cls = generic.DatabaseOperations |
724 |
def setUp(self): |
725 |
class CacheOps(self.base_ops_cls): |
726 |
def __init__(self): |
727 |
self._constraint_cache = {}
|
728 |
self.cache_filled = 0 |
729 |
self.settings = {'NAME': 'db'} |
730 |
|
731 |
def _fill_constraint_cache(self, db, table): |
732 |
self.cache_filled += 1 |
733 |
self._constraint_cache.setdefault(db, {})
|
734 |
self._constraint_cache[db].setdefault(table, {})
|
735 |
|
736 |
@generic.invalidate_table_constraints
|
737 |
def clear_con(self, table): |
738 |
pass
|
739 |
|
740 |
@generic.copy_column_constraints
|
741 |
def cp_column(self, table, column_old, column_new): |
742 |
pass
|
743 |
|
744 |
@generic.delete_column_constraints
|
745 |
def rm_column(self, table, column): |
746 |
pass
|
747 |
|
748 |
@generic.copy_column_constraints
|
749 |
@generic.delete_column_constraints
|
750 |
def mv_column(self, table, column_old, column_new): |
751 |
pass
|
752 |
|
753 |
def _get_setting(self, attr): |
754 |
return self.settings[attr] |
755 |
self.CacheOps = CacheOps
|
756 |
|
757 |
def test_cache(self): |
758 |
ops = self.CacheOps()
|
759 |
self.assertEqual(0, ops.cache_filled) |
760 |
self.assertFalse(ops.lookup_constraint('db', 'table')) |
761 |
self.assertEqual(1, ops.cache_filled) |
762 |
self.assertFalse(ops.lookup_constraint('db', 'table')) |
763 |
self.assertEqual(1, ops.cache_filled) |
764 |
ops.clear_con('table')
|
765 |
self.assertEqual(1, ops.cache_filled) |
766 |
self.assertFalse(ops.lookup_constraint('db', 'table')) |
767 |
self.assertEqual(2, ops.cache_filled) |
768 |
self.assertFalse(ops.lookup_constraint('db', 'table', 'column')) |
769 |
self.assertEqual(2, ops.cache_filled) |
770 |
|
771 |
cache = ops._constraint_cache |
772 |
cache['db']['table']['column'] = 'constraint' |
773 |
self.assertEqual('constraint', ops.lookup_constraint('db', 'table', 'column')) |
774 |
self.assertEqual([('column', 'constraint')], ops.lookup_constraint('db', 'table')) |
775 |
self.assertEqual(2, ops.cache_filled) |
776 |
|
777 |
# invalidate_table_constraints
|
778 |
ops.clear_con('new_table')
|
779 |
self.assertEqual('constraint', ops.lookup_constraint('db', 'table', 'column')) |
780 |
self.assertEqual(2, ops.cache_filled) |
781 |
|
782 |
self.assertFalse(ops.lookup_constraint('db', 'new_table')) |
783 |
self.assertEqual(3, ops.cache_filled) |
784 |
|
785 |
# delete_column_constraints
|
786 |
cache['db']['table']['column'] = 'constraint' |
787 |
self.assertEqual('constraint', ops.lookup_constraint('db', 'table', 'column')) |
788 |
ops.rm_column('table', 'column') |
789 |
self.assertEqual([], ops.lookup_constraint('db', 'table', 'column')) |
790 |
self.assertEqual([], ops.lookup_constraint('db', 'table', 'noexist_column')) |
791 |
|
792 |
# copy_column_constraints
|
793 |
cache['db']['table']['column'] = 'constraint' |
794 |
self.assertEqual('constraint', ops.lookup_constraint('db', 'table', 'column')) |
795 |
ops.cp_column('table', 'column', 'column_new') |
796 |
self.assertEqual('constraint', ops.lookup_constraint('db', 'table', 'column_new')) |
797 |
self.assertEqual('constraint', ops.lookup_constraint('db', 'table', 'column')) |
798 |
|
799 |
# copy + delete
|
800 |
cache['db']['table']['column'] = 'constraint' |
801 |
self.assertEqual('constraint', ops.lookup_constraint('db', 'table', 'column')) |
802 |
ops.mv_column('table', 'column', 'column_new') |
803 |
self.assertEqual('constraint', ops.lookup_constraint('db', 'table', 'column_new')) |
804 |
self.assertEqual([], ops.lookup_constraint('db', 'table', 'column')) |
805 |
return
|
806 |
|
807 |
def test_valid(self): |
808 |
ops = self.CacheOps()
|
809 |
# none of these should vivify a table into a valid state
|
810 |
self.assertFalse(ops._is_valid_cache('db', 'table')) |
811 |
self.assertFalse(ops._is_valid_cache('db', 'table')) |
812 |
ops.clear_con('table')
|
813 |
self.assertFalse(ops._is_valid_cache('db', 'table')) |
814 |
ops.rm_column('table', 'column') |
815 |
self.assertFalse(ops._is_valid_cache('db', 'table')) |
816 |
|
817 |
# these should change the cache state
|
818 |
ops.lookup_constraint('db', 'table') |
819 |
self.assertTrue(ops._is_valid_cache('db', 'table')) |
820 |
ops.lookup_constraint('db', 'table', 'column') |
821 |
self.assertTrue(ops._is_valid_cache('db', 'table')) |
822 |
ops.clear_con('table')
|
823 |
self.assertFalse(ops._is_valid_cache('db', 'table')) |
824 |
|
825 |
def test_valid_implementation(self): |
826 |
# generic fills the cache on a per-table basis
|
827 |
ops = self.CacheOps()
|
828 |
self.assertFalse(ops._is_valid_cache('db', 'table')) |
829 |
self.assertFalse(ops._is_valid_cache('db', 'other_table')) |
830 |
ops.lookup_constraint('db', 'table') |
831 |
self.assertTrue(ops._is_valid_cache('db', 'table')) |
832 |
self.assertFalse(ops._is_valid_cache('db', 'other_table')) |
833 |
ops.lookup_constraint('db', 'other_table') |
834 |
self.assertTrue(ops._is_valid_cache('db', 'table')) |
835 |
self.assertTrue(ops._is_valid_cache('db', 'other_table')) |
836 |
ops.clear_con('table')
|
837 |
self.assertFalse(ops._is_valid_cache('db', 'table')) |
838 |
self.assertTrue(ops._is_valid_cache('db', 'other_table')) |
839 |
|
840 |
if mysql:
|
841 |
class TestCacheMysql(TestCacheGeneric): |
842 |
base_ops_cls = mysql.DatabaseOperations |
843 |
|
844 |
def test_valid_implementation(self): |
845 |
# mysql fills the cache on a per-db basis
|
846 |
ops = self.CacheOps()
|
847 |
self.assertFalse(ops._is_valid_cache('db', 'table')) |
848 |
self.assertFalse(ops._is_valid_cache('db', 'other_table')) |
849 |
ops.lookup_constraint('db', 'table') |
850 |
self.assertTrue(ops._is_valid_cache('db', 'table')) |
851 |
self.assertTrue(ops._is_valid_cache('db', 'other_table')) |
852 |
ops.lookup_constraint('db', 'other_table') |
853 |
self.assertTrue(ops._is_valid_cache('db', 'table')) |
854 |
self.assertTrue(ops._is_valid_cache('db', 'other_table')) |
855 |
ops.clear_con('table')
|
856 |
self.assertFalse(ops._is_valid_cache('db', 'table')) |
857 |
self.assertTrue(ops._is_valid_cache('db', 'other_table')) |