1from __future__ import annotations
2
3import copy
4import warnings
5from collections.abc import Iterable, Iterator, Sequence
6from itertools import chain
7from typing import TYPE_CHECKING, Any, Self, cast
8
9if TYPE_CHECKING:
10 from plain.postgres.meta import Meta
11 from plain.postgres.options import Options
12
13import psycopg
14
15import plain.runtime
16from plain.exceptions import NON_FIELD_ERRORS, ValidationError
17from plain.postgres import models_registry, transaction, types
18from plain.postgres.constants import LOOKUP_SEP
19from plain.postgres.constraints import CheckConstraint, UniqueConstraint
20from plain.postgres.db import PLAIN_VERSION_PICKLE_KEY
21from plain.postgres.dialect import MAX_NAME_LENGTH
22from plain.postgres.exceptions import (
23 DoesNotExistDescriptor,
24 FieldDoesNotExist,
25 MultipleObjectsReturnedDescriptor,
26)
27from plain.postgres.expressions import RawSQL, Value
28from plain.postgres.fields import DATABASE_DEFAULT, NOT_PROVIDED, Field
29from plain.postgres.fields.base import ColumnField, DefaultableField
30from plain.postgres.fields.related import RelatedField
31from plain.postgres.fields.reverse_related import ForeignObjectRel
32from plain.postgres.meta import Meta
33from plain.postgres.options import Options
34from plain.postgres.query import F, Q, QuerySet
35from plain.preflight import PreflightResult
36from plain.utils.encoding import force_str
37from plain.utils.hashable import make_hashable
38
39
40class Deferred:
41 def __repr__(self) -> str:
42 return "<Deferred field>"
43
44 def __str__(self) -> str:
45 return "<Deferred field>"
46
47
48DEFERRED = Deferred()
49
50
51class ModelBase(type):
52 """Metaclass for all models."""
53
54 def __new__(
55 cls, name: str, bases: tuple[type, ...], attrs: dict[str, Any], **kwargs: Any
56 ) -> type:
57 # Don't do any of this for the root models.Model class.
58 if not bases:
59 return super().__new__(cls, name, bases, attrs)
60
61 for base in bases:
62 # Models are required to directly inherit from model.Model, not a subclass of it.
63 if issubclass(base, Model) and base is not Model:
64 raise TypeError(
65 f"A model can't extend another model: {name} extends {base}"
66 )
67
68 return super().__new__(cls, name, bases, attrs, **kwargs)
69
70
71class ModelState:
72 """Store model instance state."""
73
74 # If true, uniqueness validation checks will consider this a new, unsaved
75 # object. Necessary for correct validation of new instances of objects with
76 # explicit (non-auto) PKs. This impacts validation only; it has no effect
77 # on the actual save.
78 adding = True
79
80 def __init__(self) -> None:
81 self.fields_cache: dict[str, Any] = {}
82
83
84class Model(metaclass=ModelBase):
85 # Every model gets an automatic id field
86 id: int = types.PrimaryKeyField()
87
88 # Descriptors for other model behavior
89 query: QuerySet[Self] = QuerySet()
90 model_options: Options = Options()
91 _model_meta: Meta = Meta()
92 DoesNotExist = DoesNotExistDescriptor()
93 MultipleObjectsReturned = MultipleObjectsReturnedDescriptor()
94
95 def __init__(self, **kwargs: Any):
96 # Alias some things as locals to avoid repeat global lookups
97 cls = self.__class__
98 meta = cls._model_meta
99 _setattr = setattr
100 _DEFERRED = DEFERRED
101 _DATABASE_DEFAULT = DATABASE_DEFAULT
102
103 # Set up the storage for instance state
104 self._state = ModelState()
105
106 # Process all fields from kwargs or use defaults
107 for field in meta.fields:
108 from plain.postgres.fields.related import RelatedField
109
110 # meta.fields excludes ManyToManyField, so every iterated field
111 # is column-backed and exposes the ColumnField surface.
112 assert isinstance(field, ColumnField)
113
114 is_related_object = False
115 # Virtual field
116 if field.attname not in kwargs and field.column is None:
117 continue
118 if isinstance(field, RelatedField) and isinstance(
119 field.remote_field, ForeignObjectRel
120 ):
121 try:
122 # Assume object instance was passed in.
123 rel_obj = kwargs.pop(field.name)
124 is_related_object = True
125 except KeyError:
126 try:
127 # Object instance wasn't passed in -- must be an ID.
128 val = kwargs.pop(field.attname)
129 except KeyError:
130 val = field.get_default()
131 else:
132 try:
133 val = kwargs.pop(field.attname)
134 except KeyError:
135 # This is done with an exception rather than the
136 # default argument on pop because we don't want
137 # get_default() to be evaluated, and then not used.
138 # Refs #12057.
139 if field.has_db_default():
140 # DB-expression default: let Postgres evaluate it
141 # on INSERT. The compiler emits DEFAULT in the
142 # VALUES clause when it sees this sentinel.
143 val = _DATABASE_DEFAULT
144 else:
145 val = field.get_default()
146
147 if is_related_object:
148 # If we are passed a related instance, set it using the
149 # field.name instead of field.attname (e.g. "user" instead of
150 # "user_id") so that the object gets properly cached (and type
151 # checked) by the RelatedObjectDescriptor.
152 if rel_obj is not _DEFERRED:
153 _setattr(self, field.name, rel_obj)
154 else:
155 if val is not _DEFERRED:
156 _setattr(self, field.attname, val)
157
158 # Handle any remaining kwargs (properties or virtual fields)
159 property_names = meta._property_names
160 unexpected = ()
161 for prop, value in kwargs.items():
162 # Any remaining kwargs must correspond to properties or virtual
163 # fields.
164 if prop in property_names:
165 if value is not _DEFERRED:
166 _setattr(self, prop, value)
167 else:
168 try:
169 meta.get_field(prop)
170 except FieldDoesNotExist:
171 unexpected += (prop,)
172 else:
173 if value is not _DEFERRED:
174 _setattr(self, prop, value)
175 if unexpected:
176 unexpected_names = ", ".join(repr(n) for n in unexpected)
177 raise TypeError(
178 f"{cls.__name__}() got unexpected keyword arguments: {unexpected_names}"
179 )
180
181 super().__init__()
182
183 @classmethod
184 def from_db(cls, field_names: Iterable[str], values: Sequence[Any]) -> Model:
185 if len(values) != len(cls._model_meta.concrete_fields):
186 values_iter = iter(values)
187 values = [
188 next(values_iter) if f.attname in field_names else DEFERRED
189 for f in cls._model_meta.concrete_fields
190 ]
191 # Build kwargs dict from field names and values
192 field_dict = dict(
193 zip((f.attname for f in cls._model_meta.concrete_fields), values)
194 )
195 new = cls(**field_dict)
196 new._state.adding = False
197 return new
198
199 def __repr__(self) -> str:
200 return f"<{self.__class__.__name__}: {self.id}>"
201
202 def __str__(self) -> str:
203 return f"{self.__class__.__name__} object ({self.id})"
204
205 def __eq__(self, other: object) -> bool:
206 if not isinstance(other, Model):
207 return NotImplemented
208 if self.__class__ != other.__class__:
209 return False
210 my_id = self.id
211 if my_id is None:
212 return self is other
213 return my_id == other.id
214
215 def __hash__(self) -> int:
216 if self.id is None:
217 raise TypeError("Model instances without primary key value are unhashable")
218 return hash(self.id)
219
220 def __reduce__(self) -> tuple[Any, tuple[Any, ...], dict[str, Any]]:
221 data = self.__getstate__()
222 data[PLAIN_VERSION_PICKLE_KEY] = plain.runtime.__version__
223 class_id = (
224 self.model_options.package_label,
225 self.model_options.object_name,
226 )
227 return model_unpickle, (class_id,), data
228
229 def __getstate__(self) -> dict[str, Any]:
230 """Hook to allow choosing the attributes to pickle."""
231 state = self.__dict__.copy()
232 state["_state"] = copy.copy(state["_state"])
233 state["_state"].fields_cache = state["_state"].fields_cache.copy()
234 # memoryview cannot be pickled, so cast it to bytes and store
235 # separately.
236 _memoryview_attrs = []
237 for attr, value in state.items():
238 if isinstance(value, memoryview):
239 _memoryview_attrs.append((attr, bytes(value)))
240 if _memoryview_attrs:
241 state["_memoryview_attrs"] = _memoryview_attrs
242 for attr, value in _memoryview_attrs:
243 state.pop(attr)
244 return state
245
246 def __setstate__(self, state: dict[str, Any]) -> None:
247 pickled_version = state.get(PLAIN_VERSION_PICKLE_KEY)
248 if pickled_version:
249 if pickled_version != plain.runtime.__version__:
250 warnings.warn(
251 f"Pickled model instance's Plain version {pickled_version} does not "
252 f"match the current version {plain.runtime.__version__}.",
253 RuntimeWarning,
254 stacklevel=2,
255 )
256 else:
257 warnings.warn(
258 "Pickled model instance's Plain version is not specified.",
259 RuntimeWarning,
260 stacklevel=2,
261 )
262 if "_memoryview_attrs" in state:
263 for attr, value in state.pop("_memoryview_attrs"):
264 state[attr] = memoryview(value)
265 self.__dict__.update(state)
266
267 def get_deferred_fields(self) -> set[str]:
268 """
269 Return a set containing names of deferred fields for this instance.
270 """
271 return {
272 f.attname
273 for f in self._model_meta.concrete_fields
274 if f.attname not in self.__dict__
275 }
276
277 def refresh_from_db(self, fields: list[str] | None = None) -> None:
278 """
279 Reload field values from the database.
280
281 Fields can be used to specify which fields to reload. If fields is
282 None, then all non-deferred fields are reloaded.
283
284 When accessing deferred fields of an instance, the deferred loading
285 of the field will call this method.
286 """
287 if fields is None:
288 self._prefetched_objects_cache = {}
289 else:
290 prefetched_objects_cache = getattr(self, "_prefetched_objects_cache", {})
291 for field in fields:
292 if field in prefetched_objects_cache:
293 del prefetched_objects_cache[field]
294 fields.remove(field)
295 if not fields:
296 return
297 if any(LOOKUP_SEP in f for f in fields):
298 raise ValueError(
299 f'Found "{LOOKUP_SEP}" in fields argument. Relations and transforms '
300 "are not allowed in fields."
301 )
302
303 db_instance_qs = self._model_meta.base_queryset.filter(id=self.id)
304
305 # Use provided fields, if not set then reload all non-deferred fields.
306 deferred_fields = self.get_deferred_fields()
307 if fields is not None:
308 fields = list(fields)
309 db_instance_qs = db_instance_qs.only(*fields)
310 elif deferred_fields:
311 fields = [
312 f.attname
313 for f in self._model_meta.concrete_fields
314 if f.attname not in deferred_fields
315 ]
316 db_instance_qs = db_instance_qs.only(*fields)
317
318 db_instance = db_instance_qs.get()
319 non_loaded_fields = db_instance.get_deferred_fields()
320 for field in self._model_meta.concrete_fields:
321 if field.attname in non_loaded_fields:
322 # This field wasn't refreshed - skip ahead.
323 continue
324 setattr(self, field.attname, getattr(db_instance, field.attname))
325 # Clear cached foreign keys.
326 if isinstance(field, RelatedField) and field.is_cached(self):
327 field.delete_cached_value(self)
328
329 # Clear cached relations.
330 for field in self._model_meta.related_objects:
331 if field.is_cached(self):
332 field.delete_cached_value(self)
333
334 def serializable_value(self, field_name: str) -> Any:
335 """
336 Return the value of the field name for this instance. If the field is
337 a foreign key, return the id value instead of the object. If there's
338 no Field object with this name on the model, return the model
339 attribute's value.
340
341 Used to serialize a field's value (in the serializer, or form output,
342 for example). Normally, you would just access the attribute directly
343 and not use this method.
344 """
345 try:
346 field = self._model_meta.get_forward_field(field_name)
347 except FieldDoesNotExist:
348 return getattr(self, field_name)
349 return getattr(self, field.attname)
350
351 def save(
352 self,
353 *,
354 clean_and_validate: bool = True,
355 force_insert: bool = False,
356 force_update: bool = False,
357 update_fields: Iterable[str] | None = None,
358 ) -> None:
359 """
360 Save the current instance. Override this in a subclass if you want to
361 control the saving process.
362
363 The 'force_insert' and 'force_update' parameters can be used to insist
364 that the "save" must be an SQL INSERT or UPDATE, respectively.
365 Normally, they should not be set.
366 """
367 self._prepare_related_fields_for_save(operation_name="save")
368
369 if force_insert and (force_update or update_fields):
370 raise ValueError("Cannot force both insert and updating in model saving.")
371
372 deferred_fields = self.get_deferred_fields()
373 if update_fields is not None:
374 # Empty update_fields is a no-op save — skip the whole pipeline.
375 if not update_fields:
376 return
377
378 update_fields = frozenset(update_fields)
379 field_names = self._model_meta._non_pk_concrete_field_names
380 non_model_fields = update_fields.difference(field_names)
381
382 if non_model_fields:
383 raise ValueError(
384 "The following fields do not exist in this model, are m2m "
385 "fields, or are non-concrete fields: {}".format(
386 ", ".join(non_model_fields)
387 )
388 )
389
390 # If this model is deferred, automatically do an "update_fields" save
391 # on the loaded fields.
392 elif not force_insert and deferred_fields:
393 field_names = set()
394 for field in self._model_meta.concrete_fields:
395 if not field.primary_key and not hasattr(field, "through"):
396 field_names.add(field.attname)
397 loaded_fields = field_names.difference(deferred_fields)
398 if loaded_fields:
399 update_fields = frozenset(loaded_fields)
400
401 if clean_and_validate:
402 self.full_clean(exclude=deferred_fields)
403
404 self.save_base(
405 force_insert=force_insert,
406 force_update=force_update,
407 update_fields=update_fields,
408 )
409
410 def save_base(
411 self,
412 *,
413 raw: bool = False,
414 force_insert: bool = False,
415 force_update: bool = False,
416 update_fields: Iterable[str] | None = None,
417 ) -> None:
418 """
419 Handle the parts of saving shared between the normal and raw paths.
420
421 The 'raw' argument tells save_base to skip per-field value
422 conversions — used by fixture loading, which has already produced
423 values in their final form.
424 """
425 assert not (force_insert and (force_update or update_fields))
426 assert update_fields is None or update_fields
427 cls = self.__class__
428
429 with transaction.mark_for_rollback_on_error():
430 self._save_table(
431 raw=raw,
432 cls=cls,
433 force_insert=force_insert,
434 force_update=force_update,
435 update_fields=update_fields,
436 )
437 # Once saved, this is no longer a to-be-added instance.
438 self._state.adding = False
439
440 def _save_table(
441 self,
442 *,
443 raw: bool,
444 cls: type[Model],
445 force_insert: bool = False,
446 force_update: bool = False,
447 update_fields: Iterable[str] | None = None,
448 ) -> bool:
449 """
450 Do the heavy-lifting involved in saving. Update or insert the data
451 for a single table.
452 """
453 meta = cls._model_meta
454 non_pks = [f for f in meta.local_concrete_fields if not f.primary_key]
455
456 if update_fields:
457 non_pks = [
458 f
459 for f in non_pks
460 if f.name in update_fields or f.attname in update_fields
461 ]
462
463 id_field = meta.get_forward_field("id")
464 id_val = self.id
465 if id_val is None:
466 # User-declared literal default on the PK? Materialize it so the
467 # INSERT carries the Python value rather than letting the DB
468 # generate one. Identity PKs have no such default, so id_val
469 # stays None and the INSERT emits DEFAULT.
470 if isinstance(id_field, DefaultableField) and id_field.has_default():
471 id_val = id_field.get_default()
472 setattr(self, id_field.attname, id_val)
473 id_set = id_val is not None
474 if not id_set and (force_update or update_fields):
475 raise ValueError("Cannot force an update in save() with no primary key.")
476 updated = False
477 # Skip an UPDATE when adding an instance and primary key has a default.
478 if (
479 not raw
480 and not force_insert
481 and self._state.adding
482 and isinstance(id_field, DefaultableField)
483 and id_field.default
484 and id_field.default is not NOT_PROVIDED
485 ):
486 force_insert = True
487 # If possible, try an UPDATE. If that doesn't update anything, do an INSERT.
488 if id_set and not force_insert:
489 base_qs = meta.base_queryset
490 values = [
491 (f, (getattr(self, f.attname) if raw else f.pre_save(self, False)))
492 for f in non_pks
493 ]
494 # DATABASE_DEFAULT fields represent "let the DB produce this on
495 # INSERT" — they have no meaningful UPDATE semantic, so skip them
496 # on the UPDATE path. If the row doesn't exist the INSERT fallback
497 # below handles them correctly. If the UPDATE *does* succeed, we
498 # need to refresh those fields from the DB so the in-memory
499 # instance doesn't keep the sentinel.
500 db_default_attnames = [
501 v[0].attname for v in values if v[1] is DATABASE_DEFAULT
502 ]
503 values = [v for v in values if v[1] is not DATABASE_DEFAULT]
504 forced_update = bool(update_fields or force_update)
505 updated = self._do_update(
506 base_qs, id_val, values, update_fields, forced_update
507 )
508 if force_update and not updated:
509 raise psycopg.DatabaseError("Forced update did not affect any rows.")
510 if update_fields and not updated:
511 raise psycopg.DatabaseError(
512 "Save with update_fields did not affect any rows."
513 )
514 if updated and db_default_attnames:
515 self.refresh_from_db(fields=db_default_attnames)
516 if not updated:
517 fields = meta.local_concrete_fields
518 if not id_set:
519 id_field = meta.get_forward_field("id")
520 fields = [f for f in fields if f is not id_field]
521
522 returning_fields = meta.db_returning_fields
523 results = self._do_insert(meta.base_queryset, fields, returning_fields, raw)
524 if results:
525 for value, field in zip(results[0], returning_fields):
526 setattr(self, field.attname, value)
527 return updated
528
529 def _do_update(
530 self,
531 base_qs: QuerySet,
532 id_val: Any,
533 values: list[tuple[Any, Any]],
534 update_fields: Iterable[str] | None,
535 forced_update: bool,
536 ) -> bool:
537 """
538 Try to update the model. Return True if the model was updated (if an
539 update query was done and a matching row was found in the DB).
540 """
541 filtered = base_qs.filter(id=id_val)
542 if not values:
543 # Nothing to update — either the caller passed update_fields
544 # (so "success" means "we ran with no fields"), or the model has
545 # only its PK (in which case confirm the row still exists).
546 return update_fields is not None or filtered.exists()
547 return filtered._update(values) > 0
548
549 def _do_insert(
550 self,
551 manager: QuerySet,
552 fields: Sequence[Any],
553 returning_fields: Sequence[Any],
554 raw: bool,
555 ) -> list[tuple[Any, ...]] | None:
556 """
557 Do an INSERT. If returning_fields is defined then this method should
558 return the newly created data for the model.
559 """
560 return manager._insert(
561 [self],
562 fields=list(fields),
563 returning_fields=list(returning_fields) if returning_fields else None,
564 raw=raw,
565 )
566
567 def _prepare_related_fields_for_save(
568 self, operation_name: str, fields: Sequence[Any] | None = None
569 ) -> None:
570 # Ensure that a model instance without a PK hasn't been assigned to
571 # a ForeignKeyField on this model. If the field is nullable, allowing the save would result in silent data loss.
572 for field in self._model_meta.concrete_fields:
573 if fields and field not in fields:
574 continue
575 # If the related field isn't cached, then an instance hasn't been
576 # assigned and there's no need to worry about this check.
577 if isinstance(field, RelatedField) and field.is_cached(self):
578 obj = getattr(self, field.name, None)
579 if not obj:
580 continue
581 # A pk may have been assigned manually to a model instance not
582 # saved to the database (or auto-generated in a case like
583 # UUIDField), but we allow the save to proceed and rely on the
584 # database to raise an IntegrityError if applicable. If
585 # constraints aren't supported by the database, there's the
586 # unavoidable risk of data corruption.
587 if obj.id is None:
588 # Remove the object from a related instance cache.
589 if not field.remote_field.multiple:
590 field.remote_field.delete_cached_value(obj)
591 raise ValueError(
592 f"{operation_name}() prohibited to prevent data loss due to unsaved "
593 f"related object '{field.name}'."
594 )
595 elif getattr(self, field.attname) in field.empty_values:
596 # Set related object if it has been saved after an
597 # assignment.
598 setattr(self, field.name, obj)
599 # If the relationship's pk/to_field was changed, clear the
600 # cached relationship.
601 if getattr(obj, field.target_field.attname) != getattr(
602 self, field.attname
603 ):
604 field.delete_cached_value(self)
605
606 def delete(self) -> int:
607 """Delete this row. Returns the number of rows deleted (1 or 0).
608
609 Cascades are handled entirely by Postgres via the `on_delete`
610 clauses declared on related foreign keys.
611 """
612 if self.id is None:
613 raise ValueError(
614 f"{self.model_options.object_name} object can't be deleted because its id attribute is set "
615 "to None."
616 )
617 # Use base_queryset to bypass any user-defined filters on the public
618 # query (e.g. soft-delete scopes). An instance we have a reference to
619 # should always be deletable — custom querysets shape reads, not
620 # internal row lifecycle operations.
621 #
622 # mark_for_rollback_on_error: FK errors (RESTRICT / NO_ACTION) leave
623 # the DB transaction aborted. Mark the connection so outer atomic()
624 # blocks see the abort state even if the caller catches IntegrityError.
625 with transaction.mark_for_rollback_on_error():
626 count = self._model_meta.base_queryset.filter(id=self.id)._raw_delete()
627 setattr(self, self._model_meta.get_forward_field("id").attname, None)
628 return count
629
630 def get_field_display(self, field_name: str) -> str:
631 """Get the display value for a field, especially useful for fields with choices."""
632 # Get the field object from the field name
633 field = self._model_meta.get_forward_field(field_name)
634 value = getattr(self, field.attname)
635
636 # If field has no choices, just return the value as string
637 if not hasattr(field, "flatchoices") or not field.flatchoices:
638 return force_str(value, strings_only=True)
639
640 # For fields with choices, look up the display value
641 choices_dict = dict(make_hashable(field.flatchoices))
642 return force_str(
643 choices_dict.get(make_hashable(value), value), strings_only=True
644 )
645
646 def _get_field_value_map(
647 self, meta: Meta | None, exclude: set[str] | None = None
648 ) -> dict[str, Value]:
649 if exclude is None:
650 exclude = set()
651 meta = meta or self._model_meta
652 return {
653 field.name: Value(getattr(self, field.attname), field)
654 for field in meta.local_concrete_fields
655 if field.name not in exclude
656 }
657
658 def prepare_database_save(self, field: Any) -> Any:
659 if self.id is None:
660 raise ValueError(
661 f"Unsaved model instance {self!r} cannot be used in an ORM query."
662 )
663 return getattr(self, field.remote_field.get_related_field().attname)
664
665 def clean(self) -> None:
666 """
667 Hook for doing any extra model-wide validation after clean() has been
668 called on every field by self.clean_fields. Any ValidationError raised
669 by this method will not be associated with a particular field; it will
670 have a special-case association with the field defined by NON_FIELD_ERRORS.
671 """
672 pass
673
674 def validate_unique(self, exclude: set[str] | None = None) -> None:
675 """
676 Check unique constraints on the model and raise ValidationError if any
677 failed.
678 """
679 unique_checks = self._get_unique_checks(exclude=exclude)
680
681 if errors := self._perform_unique_checks(unique_checks):
682 raise ValidationError(errors)
683
684 def _get_unique_checks(
685 self, exclude: set[str] | None = None
686 ) -> list[tuple[type[Model], tuple[str, ...]]]:
687 """
688 Return a list of checks to perform. Since validate_unique() could be
689 called from a ModelForm, some fields may have been excluded; we can't
690 perform a unique check on a model that is missing fields involved
691 in that check. Fields that did not validate should also be excluded,
692 but they need to be passed in via the exclude argument.
693 """
694 if exclude is None:
695 exclude = set()
696 unique_checks = []
697
698 # Gather a list of checks for fields declared as unique and add them to
699 # the list of checks.
700
701 fields_with_class = [(self.__class__, self._model_meta.local_fields)]
702
703 for model_class, fields in fields_with_class:
704 for f in fields:
705 name = f.name
706 if name in exclude:
707 continue
708 if f.primary_key:
709 unique_checks.append((model_class, (name,)))
710
711 return unique_checks
712
713 def _perform_unique_checks(
714 self, unique_checks: list[tuple[type[Model], tuple[str, ...]]]
715 ) -> dict[str, list[ValidationError]]:
716 errors = {}
717
718 for model_class, unique_check in unique_checks:
719 # Try to look up an existing object with the same values as this
720 # object's values for all the unique field.
721
722 lookup_kwargs = {}
723 for field_name in unique_check:
724 f = self._model_meta.get_forward_field(field_name)
725 lookup_value = getattr(self, f.attname)
726 if lookup_value is None:
727 # no value, skip the lookup
728 continue
729 if f.primary_key and not self._state.adding:
730 # no need to check for unique primary key when editing
731 continue
732 lookup_kwargs[str(field_name)] = lookup_value
733
734 # some fields were skipped, no reason to do the check
735 if len(unique_check) != len(lookup_kwargs):
736 continue
737
738 qs = model_class.query.filter(**lookup_kwargs)
739
740 # Exclude the current object from the query if we are editing an
741 # instance (as opposed to creating a new one).
742 model_class_id = getattr(self, "id")
743 if not self._state.adding and model_class_id is not None:
744 qs = qs.exclude(id=model_class_id)
745 if qs.exists():
746 if len(unique_check) == 1:
747 key = unique_check[0]
748 else:
749 key = NON_FIELD_ERRORS
750 errors.setdefault(key, []).append(
751 self.unique_error_message(model_class, unique_check)
752 )
753
754 return errors
755
756 def unique_error_message(
757 self, model_class: type[Model], unique_check: tuple[str, ...]
758 ) -> ValidationError:
759 meta = model_class._model_meta
760
761 params = {
762 "model": self,
763 "model_class": model_class,
764 "model_name": model_class.model_options.model_name,
765 "unique_check": unique_check,
766 }
767
768 if len(unique_check) == 1:
769 field = meta.get_forward_field(unique_check[0])
770 params["field_label"] = field.name # ty: ignore[invalid-assignment]
771 return ValidationError(
772 message=field.unique_error_message,
773 code="unique",
774 params=params,
775 )
776 else:
777 field_names = [meta.get_forward_field(f).name for f in unique_check]
778
779 # Put an "and" before the last one
780 field_names[-1] = f"and {field_names[-1]}"
781
782 if len(field_names) > 2:
783 # Comma join if more than 2
784 params["field_label"] = ", ".join(cast(list[str], field_names))
785 else:
786 # Just a space if there are only 2
787 params["field_label"] = " ".join(cast(list[str], field_names))
788
789 # Use the first field as the message format...
790 message = meta.get_forward_field(unique_check[0]).unique_error_message
791
792 return ValidationError(
793 message=message,
794 code="unique",
795 params=params,
796 )
797
798 def get_constraints(self) -> list[tuple[type[Model], list[Any]]]:
799 constraints: list[tuple[type[Model], list[Any]]] = [
800 (self.__class__, list(self.model_options.constraints))
801 ]
802 return constraints
803
804 def validate_constraints(self, exclude: set[str] | None = None) -> None:
805 constraints = self.get_constraints()
806
807 errors: dict[str, list[ValidationError]] = {}
808 for model_class, model_constraints in constraints:
809 for constraint in model_constraints:
810 try:
811 constraint.validate(model_class, self, exclude=exclude)
812 except ValidationError as e:
813 errors = e.update_error_dict(errors)
814 if errors:
815 raise ValidationError(errors)
816
817 def full_clean(
818 self,
819 *,
820 exclude: set[str] | Iterable[str] | None = None,
821 validate_unique: bool = True,
822 validate_constraints: bool = True,
823 ) -> None:
824 """
825 Call clean_fields(), clean(), validate_unique(), and
826 validate_constraints() on the model. Raise a ValidationError for any
827 errors that occur.
828 """
829 errors = {}
830 if exclude is None:
831 exclude = set()
832 else:
833 exclude = set(exclude)
834
835 # Fields holding the DATABASE_DEFAULT sentinel will be produced by
836 # the database on INSERT — there's no Python value to clean,
837 # uniqueness-check, or feed into a constraint lookup. Exclude them
838 # from every validation step until the value is populated. Read via
839 # __dict__ to avoid triggering refresh_from_db on deferred fields.
840 # Also exclude fields that pre_save fills in (e.g. update_now) —
841 # the value isn't present yet but will be before the INSERT/UPDATE.
842 for f in self._model_meta.fields:
843 if f.name in exclude:
844 continue
845 if self.__dict__.get(f.attname) is DATABASE_DEFAULT:
846 exclude.add(f.name)
847 elif f.auto_fills_on_save:
848 exclude.add(f.name)
849
850 try:
851 self.clean_fields(exclude=exclude)
852 except ValidationError as e:
853 errors = e.update_error_dict(errors)
854
855 # Form.clean() is run even if other validation fails, so do the
856 # same with Model.clean() for consistency.
857 try:
858 self.clean()
859 except ValidationError as e:
860 errors = e.update_error_dict(errors)
861
862 # Run unique checks, but only for fields that passed validation.
863 if validate_unique:
864 for name in errors:
865 if name != NON_FIELD_ERRORS and name not in exclude:
866 exclude.add(name)
867 try:
868 self.validate_unique(exclude=exclude)
869 except ValidationError as e:
870 errors = e.update_error_dict(errors)
871
872 # Run constraints checks, but only for fields that passed validation.
873 if validate_constraints:
874 for name in errors:
875 if name != NON_FIELD_ERRORS and name not in exclude:
876 exclude.add(name)
877 try:
878 self.validate_constraints(exclude=exclude)
879 except ValidationError as e:
880 errors = e.update_error_dict(errors)
881
882 if errors:
883 raise ValidationError(errors)
884
885 def clean_fields(self, exclude: set[str] | None = None) -> None:
886 """
887 Clean all fields and raise a ValidationError containing a dict
888 of all validation errors if any occur.
889 """
890 if exclude is None:
891 exclude = set()
892
893 errors = {}
894 for f in self._model_meta.fields:
895 if f.name in exclude:
896 continue
897 # Skip validation for empty fields with required=False. The developer
898 # is responsible for making sure they have a valid value.
899 raw_value = getattr(self, f.attname)
900 if not f.required and raw_value in f.empty_values:
901 continue
902 try:
903 setattr(self, f.attname, f.clean(raw_value, self))
904 except ValidationError as e:
905 errors[f.name] = e.error_list
906
907 if errors:
908 raise ValidationError(errors)
909
910 @classmethod
911 def preflight(cls) -> list[PreflightResult]:
912 errors: list[PreflightResult] = []
913
914 errors += [
915 *cls._check_fields(),
916 *cls._check_m2m_through_same_relationship(),
917 *cls._check_long_column_names(),
918 ]
919 clash_errors = (
920 *cls._check_id_field(),
921 *cls._check_field_name_clashes(),
922 *cls._check_model_name_db_lookup_clashes(),
923 *cls._check_property_name_related_field_accessor_clashes(),
924 *cls._check_single_primary_key(),
925 )
926 errors.extend(clash_errors)
927 # If there are field name clashes, hide consequent column name
928 # clashes.
929 if not clash_errors:
930 errors.extend(cls._check_column_name_clashes())
931 errors += [
932 *cls._check_indexes(),
933 *cls._check_ordering(),
934 *cls._check_constraints(),
935 ]
936
937 return errors
938
939 @classmethod
940 def _check_fields(cls) -> list[PreflightResult]:
941 """Perform all field checks."""
942 errors: list[PreflightResult] = []
943 for field in cls._model_meta.local_fields:
944 errors.extend(field.preflight(from_model=cls))
945 for field in cls._model_meta.local_many_to_many:
946 errors.extend(field.preflight(from_model=cls))
947 return errors
948
949 @classmethod
950 def _check_m2m_through_same_relationship(cls) -> list[PreflightResult]:
951 """Check if no relationship model is used by more than one m2m field."""
952
953 errors: list[PreflightResult] = []
954 seen_intermediary_signatures = []
955
956 fields = cls._model_meta.local_many_to_many
957
958 # Skip when the target model wasn't found.
959 fields = (f for f in fields if isinstance(f.remote_field.model, ModelBase))
960
961 # Skip when the relationship model wasn't found.
962 fields = (f for f in fields if isinstance(f.remote_field.through, ModelBase))
963
964 for f in fields:
965 signature = (
966 f.remote_field.model,
967 cls,
968 f.remote_field.through,
969 f.remote_field.through_fields,
970 )
971 if signature in seen_intermediary_signatures:
972 errors.append(
973 PreflightResult(
974 fix="The model has two identical many-to-many relations "
975 f"through the intermediate model '{f.remote_field.through.model_options.label}'.",
976 obj=cls,
977 id="postgres.duplicate_many_to_many_relations",
978 )
979 )
980 else:
981 seen_intermediary_signatures.append(signature)
982 return errors
983
984 @classmethod
985 def _check_id_field(cls) -> list[PreflightResult]:
986 """Disallow user-defined fields named ``id``."""
987 if any(
988 f
989 for f in cls._model_meta.local_fields
990 if f.name == "id" and not f.auto_created
991 ):
992 return [
993 PreflightResult(
994 fix="'id' is a reserved word that cannot be used as a field name.",
995 obj=cls,
996 id="postgres.reserved_field_name_id",
997 )
998 ]
999 return []
1000
1001 @classmethod
1002 def _check_field_name_clashes(cls) -> list[PreflightResult]:
1003 """Reject fields that share a name or attname within the same model."""
1004 errors: list[PreflightResult] = []
1005 used_fields = {} # name or attname -> field
1006
1007 for f in cls._model_meta.local_fields:
1008 clash = used_fields.get(f.name) or used_fields.get(f.attname) or None
1009 # Note that we may detect clash between user-defined non-unique
1010 # field "id" and automatically added unique field "id", both
1011 # defined at the same model. This special case is considered in
1012 # _check_id_field and here we ignore it.
1013 id_conflict = (
1014 f.name == "id" and clash and clash.name == "id" and clash.model == cls
1015 )
1016 if clash and not id_conflict:
1017 errors.append(
1018 PreflightResult(
1019 fix=f"The field '{f.name}' clashes with the field '{clash.name}' "
1020 f"from model '{clash.model.model_options}'.",
1021 obj=f,
1022 id="postgres.field_name_clash",
1023 )
1024 )
1025 used_fields[f.name] = f
1026 used_fields[f.attname] = f
1027
1028 return errors
1029
1030 @classmethod
1031 def _check_column_name_clashes(cls) -> list[PreflightResult]:
1032 # Store a list of column names which have already been used by other fields.
1033 used_column_names: list[str] = []
1034 errors: list[PreflightResult] = []
1035
1036 for f in cls._model_meta.local_fields:
1037 column_name = f.column
1038
1039 # Ensure the column name is not already in use.
1040 if column_name and column_name in used_column_names:
1041 errors.append(
1042 PreflightResult(
1043 fix=f"Field '{f.name}' has column name '{column_name}' that is used by "
1044 "another field.",
1045 obj=cls,
1046 id="postgres.db_column_clash",
1047 )
1048 )
1049 else:
1050 used_column_names.append(column_name)
1051
1052 return errors
1053
1054 @classmethod
1055 def _check_model_name_db_lookup_clashes(cls) -> list[PreflightResult]:
1056 errors: list[PreflightResult] = []
1057 model_name = cls.__name__
1058 if model_name.startswith("_") or model_name.endswith("_"):
1059 errors.append(
1060 PreflightResult(
1061 fix=f"The model name '{model_name}' cannot start or end with an underscore "
1062 "as it collides with the query lookup syntax.",
1063 obj=cls,
1064 id="postgres.model_name_underscore_bounds",
1065 )
1066 )
1067 elif LOOKUP_SEP in model_name:
1068 errors.append(
1069 PreflightResult(
1070 fix=f"The model name '{model_name}' cannot contain double underscores as "
1071 "it collides with the query lookup syntax.",
1072 obj=cls,
1073 id="postgres.model_name_double_underscore",
1074 )
1075 )
1076 return errors
1077
1078 @classmethod
1079 def _check_property_name_related_field_accessor_clashes(
1080 cls,
1081 ) -> list[PreflightResult]:
1082 errors: list[PreflightResult] = []
1083 property_names = cls._model_meta._property_names
1084 related_field_accessors = (
1085 f.get_attname()
1086 for f in cls._model_meta._get_fields(reverse=False)
1087 if isinstance(f, RelatedField)
1088 )
1089 for accessor in related_field_accessors:
1090 if accessor in property_names:
1091 errors.append(
1092 PreflightResult(
1093 fix=f"The property '{accessor}' clashes with a related field "
1094 "accessor.",
1095 obj=cls,
1096 id="postgres.property_related_field_clash",
1097 )
1098 )
1099 return errors
1100
1101 @classmethod
1102 def _check_single_primary_key(cls) -> list[PreflightResult]:
1103 errors: list[PreflightResult] = []
1104 if sum(1 for f in cls._model_meta.local_fields if f.primary_key) > 1:
1105 errors.append(
1106 PreflightResult(
1107 fix="The model cannot have more than one field with "
1108 "'primary_key=True'.",
1109 obj=cls,
1110 id="postgres.multiple_primary_keys",
1111 )
1112 )
1113 return errors
1114
1115 @classmethod
1116 def _check_indexes(cls) -> list[PreflightResult]:
1117 """Check fields, names, and conditions of indexes."""
1118 errors: list[PreflightResult] = []
1119 references: set[str] = set()
1120 for index in cls.model_options.indexes:
1121 # Index name can't start with an underscore or a number
1122 if index.name[0] == "_" or index.name[0].isdigit():
1123 errors.append(
1124 PreflightResult(
1125 fix=f"The index name '{index.name}' cannot start with an underscore "
1126 "or a number.",
1127 obj=cls,
1128 id="postgres.index_name_invalid_start",
1129 ),
1130 )
1131 if len(index.name) > index.max_name_length:
1132 errors.append(
1133 PreflightResult(
1134 fix="The index name '%s' cannot be longer than %d " # noqa: UP031
1135 "characters." % (index.name, index.max_name_length),
1136 obj=cls,
1137 id="postgres.index_name_too_long",
1138 ),
1139 )
1140 if index.contains_expressions:
1141 for expression in index.expressions:
1142 references.update(
1143 ref[0] for ref in cls._get_expr_references(expression)
1144 )
1145 # Check fields referenced in indexes
1146 fields = [
1147 field
1148 for index in cls.model_options.indexes
1149 for field, _ in index.fields_orders
1150 ]
1151 fields += [
1152 include for index in cls.model_options.indexes for include in index.include
1153 ]
1154 fields += references
1155 errors.extend(cls._check_local_fields(fields, "indexes"))
1156 return errors
1157
1158 @classmethod
1159 def _check_local_fields(
1160 cls, fields: Iterable[str], option: str
1161 ) -> list[PreflightResult]:
1162 # In order to avoid hitting the relation tree prematurely, we use our
1163 # own fields_map instead of using get_field()
1164 forward_fields_map: dict[str, Field] = {}
1165 for field in cls._model_meta._get_fields(reverse=False):
1166 forward_fields_map[field.name] = field
1167 if hasattr(field, "attname"):
1168 forward_fields_map[field.attname] = field
1169
1170 errors: list[PreflightResult] = []
1171 for field_name in fields:
1172 try:
1173 field = forward_fields_map[field_name]
1174 except KeyError:
1175 errors.append(
1176 PreflightResult(
1177 fix=f"'{option}' refers to the nonexistent field '{field_name}'.",
1178 obj=cls,
1179 id="postgres.nonexistent_field_reference",
1180 )
1181 )
1182 else:
1183 from plain.postgres.fields.related import ManyToManyField
1184
1185 if isinstance(field, ManyToManyField):
1186 errors.append(
1187 PreflightResult(
1188 fix=f"'{option}' refers to a ManyToManyField '{field_name}', but "
1189 f"ManyToManyFields are not permitted in '{option}'.",
1190 obj=cls,
1191 id="postgres.m2m_field_in_meta_option",
1192 )
1193 )
1194 elif field not in cls._model_meta.local_fields:
1195 errors.append(
1196 PreflightResult(
1197 fix=f"'{option}' refers to field '{field_name}' which is not local to model "
1198 f"'{cls.model_options.object_name}'.",
1199 obj=cls,
1200 id="postgres.non_local_field_reference",
1201 )
1202 )
1203 return errors
1204
1205 @classmethod
1206 def _check_ordering(cls) -> list[PreflightResult]:
1207 """
1208 Check "ordering" option -- is it a list of strings and do all fields
1209 exist?
1210 """
1211
1212 if not cls.model_options.ordering:
1213 return []
1214
1215 if not isinstance(cls.model_options.ordering, list | tuple):
1216 return [
1217 PreflightResult(
1218 fix="'ordering' must be a tuple or list (even if you want to order by "
1219 "only one field).",
1220 obj=cls,
1221 id="postgres.ordering_not_tuple_or_list",
1222 )
1223 ]
1224
1225 errors: list[PreflightResult] = []
1226 fields = cls.model_options.ordering
1227
1228 # Skip expressions and '?' fields.
1229 fields = (f for f in fields if isinstance(f, str) and f != "?")
1230
1231 # Convert "-field" to "field".
1232 fields = (f.removeprefix("-") for f in fields)
1233
1234 # Separate related fields and non-related fields.
1235 _fields = []
1236 related_fields = []
1237 for f in fields:
1238 if LOOKUP_SEP in f:
1239 related_fields.append(f)
1240 else:
1241 _fields.append(f)
1242 fields = _fields
1243
1244 # Check related fields.
1245 for field in related_fields:
1246 _cls = cls
1247 fld = None
1248 for part in field.split(LOOKUP_SEP):
1249 try:
1250 fld = _cls._model_meta.get_field(part) # ty: ignore[unresolved-attribute]
1251 if isinstance(fld, RelatedField):
1252 _cls = fld.path_infos[-1].to_meta.model
1253 else:
1254 _cls = None
1255 except (FieldDoesNotExist, AttributeError):
1256 if fld is None or (
1257 not isinstance(fld, Field)
1258 or (
1259 fld.get_transform(part) is None
1260 and fld.get_lookup(part) is None
1261 )
1262 ):
1263 errors.append(
1264 PreflightResult(
1265 fix="'ordering' refers to the nonexistent field, "
1266 f"related field, or lookup '{field}'.",
1267 obj=cls,
1268 id="postgres.ordering_nonexistent_field",
1269 )
1270 )
1271
1272 # Check for invalid or nonexistent fields in ordering.
1273 invalid_fields = []
1274
1275 # Any field name that is not present in field_names does not exist.
1276 # Also, ordering by m2m fields is not allowed.
1277 meta = cls._model_meta
1278 valid_fields = set(
1279 chain.from_iterable(
1280 (f.name, f.attname)
1281 if not (f.auto_created and not f.concrete)
1282 else (f.field.related_query_name(),)
1283 for f in chain(meta.fields, meta.related_objects)
1284 )
1285 )
1286
1287 invalid_fields.extend(set(fields) - valid_fields)
1288
1289 for invalid_field in invalid_fields:
1290 errors.append(
1291 PreflightResult(
1292 fix="'ordering' refers to the nonexistent field, related "
1293 f"field, or lookup '{invalid_field}'.",
1294 obj=cls,
1295 id="postgres.ordering_nonexistent_field",
1296 )
1297 )
1298 return errors
1299
1300 @classmethod
1301 def _check_long_column_names(cls) -> list[PreflightResult]:
1302 """
1303 Check that any auto-generated column names are shorter than the limits
1304 for each database in which the model will be created.
1305 """
1306 errors: list[PreflightResult] = []
1307
1308 # PostgreSQL has a 63-character limit on identifier names and doesn't
1309 # silently truncate, so we check for names that are too long
1310 allowed_len = MAX_NAME_LENGTH
1311
1312 for f in cls._model_meta.local_fields:
1313 column_name = f.column
1314
1315 # Check if column name is too long for the database.
1316 if column_name is not None and len(column_name) > allowed_len:
1317 errors.append(
1318 PreflightResult(
1319 fix=f'Column name too long for field "{column_name}". '
1320 f'Maximum length is "{allowed_len}" for the database.',
1321 obj=cls,
1322 id="postgres.column_name_too_long",
1323 )
1324 )
1325
1326 for f in cls._model_meta.local_many_to_many:
1327 # Skip nonexistent models.
1328 if isinstance(f.remote_field.through, str):
1329 continue
1330
1331 # Check if column name for the M2M field is too long for the database.
1332 for m2m in f.remote_field.through._model_meta.local_fields:
1333 rel_name = m2m.column
1334 if rel_name is not None and len(rel_name) > allowed_len:
1335 errors.append(
1336 PreflightResult(
1337 fix="Column name too long for M2M field "
1338 f'"{rel_name}". Maximum length is "{allowed_len}" for the database.',
1339 obj=cls,
1340 id="postgres.m2m_column_name_too_long",
1341 )
1342 )
1343
1344 return errors
1345
1346 @classmethod
1347 def _get_expr_references(cls, expr: Any) -> Iterator[tuple[str, ...]]:
1348 if isinstance(expr, Q):
1349 for child in expr.children:
1350 if isinstance(child, tuple):
1351 lookup, value = child
1352 yield tuple(lookup.split(LOOKUP_SEP))
1353 yield from cls._get_expr_references(value)
1354 else:
1355 yield from cls._get_expr_references(child)
1356 elif isinstance(expr, F):
1357 yield tuple(expr.name.split(LOOKUP_SEP))
1358 elif hasattr(expr, "get_source_expressions"):
1359 for src_expr in expr.get_source_expressions():
1360 yield from cls._get_expr_references(src_expr)
1361
1362 @classmethod
1363 def _check_constraints(cls) -> list[PreflightResult]:
1364 errors: list[PreflightResult] = []
1365 fields = set(
1366 chain.from_iterable(
1367 (*constraint.fields, *constraint.include)
1368 for constraint in cls.model_options.constraints
1369 if isinstance(constraint, UniqueConstraint)
1370 )
1371 )
1372 references = set()
1373 for constraint in cls.model_options.constraints:
1374 if isinstance(constraint, UniqueConstraint):
1375 if isinstance(constraint.condition, Q):
1376 references.update(cls._get_expr_references(constraint.condition))
1377 if constraint.contains_expressions:
1378 for expression in constraint.expressions:
1379 references.update(cls._get_expr_references(expression))
1380 elif isinstance(constraint, CheckConstraint):
1381 if isinstance(constraint.check, Q):
1382 references.update(cls._get_expr_references(constraint.check))
1383 if any(isinstance(expr, RawSQL) for expr in constraint.check.flatten()):
1384 errors.append(
1385 PreflightResult(
1386 fix=f"Check constraint {constraint.name!r} contains "
1387 f"RawSQL() expression and won't be validated "
1388 f"during the model full_clean(). "
1389 "Silence this warning if you don't care about it.",
1390 warning=True,
1391 obj=cls,
1392 id="postgres.constraint_name_collision_autogenerated",
1393 ),
1394 )
1395 for field_name, *lookups in references:
1396 fields.add(field_name)
1397 if not lookups:
1398 # If it has no lookups it cannot result in a JOIN.
1399 continue
1400 try:
1401 field = cls._model_meta.get_field(field_name)
1402 from plain.postgres.fields.related import ManyToManyField
1403 from plain.postgres.fields.reverse_related import ForeignKeyRel
1404
1405 if (
1406 not isinstance(field, RelatedField)
1407 or isinstance(field, ManyToManyField)
1408 or isinstance(field, ForeignKeyRel)
1409 ):
1410 continue
1411 except FieldDoesNotExist:
1412 continue
1413 # JOIN must happen at the first lookup.
1414 first_lookup = lookups[0]
1415 if (
1416 hasattr(field, "get_transform")
1417 and hasattr(field, "get_lookup")
1418 and field.get_transform(first_lookup) is None
1419 and field.get_lookup(first_lookup) is None
1420 ):
1421 errors.append(
1422 PreflightResult(
1423 fix=f"'constraints' refers to the joined field '{LOOKUP_SEP.join([field_name] + lookups)}'.",
1424 obj=cls,
1425 id="postgres.constraint_refers_to_joined_field",
1426 )
1427 )
1428 errors.extend(cls._check_local_fields(fields, "constraints"))
1429 return errors
1430
1431
1432########
1433# MISC #
1434########
1435
1436
1437def model_unpickle(model_id: tuple[str, str] | type[Model]) -> Model:
1438 """Used to unpickle Model subclasses with deferred fields."""
1439 if isinstance(model_id, tuple):
1440 model = models_registry.get_model(*model_id)
1441 else:
1442 # Backwards compat - the model was cached directly in earlier versions.
1443 model = model_id
1444 return model.__new__(model)
1445
1446
1447# Pickle protocol marker - functions don't normally have this attribute
1448model_unpickle.__safe_for_unpickle__ = True # ty: ignore[unresolved-attribute]