import copy
import io
import re
import time
import uuid
from os.path import splitext
from typing import Hashable, Tuple, List, Iterable
from sqlalchemy.ext.mutable import MutableList, MutableDict
from PIL import Image as PilImage
from .constants import MB, KB
from .descriptors import AttachableDescriptor
from .exceptions import ThumbnailIsNotAvailableError
from .helpers import validate_width_height_ratio
from .stores import StoreManager, Store
from .typing_ import Attachable, Dimension
[docs]class Attachment(MutableDict):
"""
The base model for an attached file.
All attachment types will be inherited from this class.
Actually this is an instance of :class:`sqlalchemy.ext.mutable.MutableDict`
which inherited from :class:`dict`.
.. doctest::
>>> from sqlalchemy_media import Attachment
>>> print(Attachment(key1='Value1'))
{'key1': 'Value1'}
This object should be used inside a :class:`.StoreManager` context.
.. versionchanged:: 0.5
- removed ``__analyzer__`` attribute, using ``__pre_processors__``
instead.
- removed ``__validate__`` attribute, using ``__pre_processors__``
instead.
.. versionadded:: 0.9.6
- ``reproducible``
"""
#: The directory name of the file.
__directory__ = 'attachments'
#: The prefix to be prepended an the name of the file.
__prefix__ = 'attachment'
#: Limit the file's maximum size.
__max_length__ = None
#: Limit the file's minimum size.
__min_length__ = None
#: An instance of :class:`.Processor`, to convert, reformat & change
# contents before storing the attachment.
__pre_processors__ = None
#: Automatically coerce `:obj:.Attachable` objects. So if True, you can
# set the models attribute by a `file` or `filename` or
# :class:`cgi.FieldStorage`.
__auto_coercion__ = False
#: The reproducible of the file.
__reproducible__ = False
#: It allows to customize the Descriptor.
__descriptor__ = AttachableDescriptor
@classmethod
def _listen_on_attribute(cls, attribute, coerce, parent_cls):
StoreManager.observe_attribute(attribute)
super()._listen_on_attribute(attribute, coerce, parent_cls)
[docs] @classmethod
def coerce(cls, key, value) -> 'Attachment':
"""
Converts plain dictionary to instance of this class.
.. seealso:: :meth:`sqlalchemy.ext.mutable.MutableDict.coerce`
"""
if value is None or isinstance(value, (cls, dict)):
return super().coerce(key, value)
if cls.__auto_coercion__:
if not isinstance(value, (tuple, list)):
value = (value, )
return cls.create_from(*value)
raise TypeError(
'Value type must be subclass of % s or a '
'tuple(file, mime, [filename]), but it\'s: %s' % (
cls, type(value)
))
[docs] @classmethod
def create_from(cls, *args, **kwargs):
"""
Factory method to create and attach file with the same action.
:param args: The same as the :meth:`.attach`
:param kwargs: The same as the :meth:`.attach`
:return: The loaded instance of this class.
"""
instance = cls()
return instance.attach(*args, **kwargs)
def __hash__(self) -> int:
"""
Returns the unique hash of this attachment based on :attr:`.key`
"""
return hash(self.key)
@property
def store_id(self) -> str:
"""
Returns the id of the store used to put this file on.
Stores must be registered with appropriate id via
:meth:`.StoreManager.register`.
:type: str
"""
return self.get('store_id')
@property
def key(self) -> Hashable:
"""
Unique key for tracking this attachment. it will be generated during
attachment process in
:meth:`.attach` method.
:type: Hashable
"""
return self.get('key')
@key.setter
def key(self, value) -> None:
self['key'] = value
@property
def empty(self) -> bool:
"""
Check if file is attached to this object or not. Returns
:const:`True` when a file is loaded on this object via :meth:`.attach`
method or SqlAlchemy db load mechanism, else :const:`False.`
:type: bool
"""
return self.key is None
@property
def path(self) -> str:
"""
Relative Path of the file used to store and locate the file.
:type: str
"""
return '%s/%s' % (self.__directory__, self.filename)
@property
def filename(self) -> str:
"""
The filename used to store the attachment in the storage with this
format::
'{self.__prefix__}-{self.key}{self.suffix}{if self.extension
else ''}'
:type: str
"""
return '%s-%s%s%s' % (self.__prefix__, self.key, self.suffix,
self.extension if self.extension else '')
@property
def suffix(self) -> str:
"""
The same as the
:meth:`sqlalchemy_media.attachments.Attachment.original_filename`
plus a leading minus(-) if available, else empty string ('') will be
returned.
:type: str
"""
if self.original_filename:
return '-%s' % re.sub(
r'[/:.?]+',
'_',
re.sub(
r'\w+://',
'',
splitext(self.original_filename)[0]
)
)
return ''
@property
def extension(self) -> str:
"""
File extension.
:type: str
"""
return self.get('extension')
@property
def content_type(self) -> str:
"""
file Content-Type
:type: str
"""
return self.get('content_type')
@property
def original_filename(self) -> str:
"""
Original file name, it may be provided by user within
:attr:`cgi.FieldStorage.filename`, url or Physical filename.
:type: str
"""
return self.get('original_filename')
@property
def length(self) -> int:
"""
The length of the attached file in bytes.
:type: int
"""
return int(self.get('length'))
@property
def timestamp(self):
"""
The unix-time of the attachment creation.
:type: str
"""
return self.get('timestamp')
@timestamp.setter
def timestamp(self, v: [str, float]):
self['timestamp'] = str(v) if not isinstance(v, str) else v
@property
def reproducible(self) -> bool:
"""
The reproducible of the file.
:type: bool
"""
return self.get('reproducible', False)
[docs] def copy(self) -> 'Attachment':
"""
Copy this object using deepcopy.
"""
return self.__class__(copy.deepcopy(self))
[docs] def get_store(self) -> Store:
"""
Returns the :class:`sqlalchemy_media.stores.Store` instance, which
this file is stored on.
"""
store_manager = StoreManager.get_current_store_manager()
return store_manager.get(self.store_id)
[docs] def delete(self) -> None:
"""
Deletes the file.
.. warning:: This operation can not be roll-backed.So if you want to
delete a file, just set it to :const:`None` or set it by
new :class:`.Attachment` instance, while passed
``delete_orphan=True`` in :class:`.StoreManager`.
"""
self.get_store().delete(self.path)
[docs] def attach(
self,
attachable: Attachable,
content_type: str = None,
original_filename: str = None,
extension: str = None,
store_id: str = None,
overwrite: bool = False,
suppress_pre_process: bool = False,
suppress_validation: bool = False,
**kwargs) -> 'Attachment':
"""
Attach a file. if the session is rolled-back, all operations will be
rolled-back. The old file will be deleted after commit, if any.
Workflow::
+--------+
| Start |
+---+----+
|
+----------v-----------+
| Wrap with Descriptor <----+
+----------+-----------+ |
| |
+----------v-----------+ |
| Nothing or Analyze | |
+----------+-----------+ |
| |
+----------v-----------+ |
| Nothing or Validate | |
+----------+-----------+ |
| |
+----------v-----------+ |
|Nothing or Pre Process+----+
+------+---------------+
|
+----------+-----------+
| |
+------v---------+ +---------v------+
| Store in DB | |Store In Storage|
+------+---------+ +---------+------+
| |
+----------+-----------+
|
|
+---v----+
| Finish |
+--------+
:param attachable: file-like object, filename or URL to attach.
:param content_type: If given, the content-detection is suppressed.
:param original_filename: Original name of the file, if available, to append to the end of
the the filename, useful for SEO, and readability.
:param extension: The file's extension, is available.else, tries to guess it by
content_type
:param store_id: The store id to store this file on. Stores must be registered with
appropriate id via :meth:`sqlalchemy_media.stores.StoreManager.register`.
:param overwrite: Overwrites the file without changing it's unique-key and name, useful to
prevent broken links.
Currently, when using this option, Rollback function is not available,
because the old file will be overwritten by the given new one.
:param suppress_pre_process: When is :data:`.True`, ignores the pre-processing phase,
during attachment.
:param suppress_validation: When is :data:`.True`, ignores the validation phase,
during attachment.
:param kwargs: Additional metadata to be stored in backend.
.. note:: :exc:`.MaximumLengthIsReachedError` and or :exc:`.MinimumLengthIsNotReachedError`
may be raised.
.. warning:: This operation can not be rolled-back, if ``overwrite=True`` given.
.. versionchanged:: 0.1
- This method will return the self. it's useful to chain method calls on the object
within a single line.
- Additional ``kwargs`` are accepted to be stored in database alongside the file's
metadata.
.. versionchanged:: 0.5
- ``suppress_pre_process`` argument.
- ``suppress_validation`` argument.
- pre-processing phase.
"""
# Wrap in AttachableDescriptor
descriptor = self.__descriptor__(
attachable,
content_type=content_type,
original_filename=original_filename,
extension=extension,
max_length=self.__max_length__,
min_length=self.__min_length__,
reproducible=self.__reproducible__
)
try:
# Backup the old key and filename if exists
if overwrite:
old_attachment = None
else:
old_attachment = None if self.empty else self.copy()
self.key = str(uuid.uuid4())
# Store information from descriptor
attachment_info = kwargs.copy()
attachment_info.update(
original_filename=descriptor.original_filename,
extension=descriptor.extension,
content_type=descriptor.content_type,
length=descriptor.content_length,
store_id=store_id,
reproducible=descriptor.reproducible
)
# Pre-processing
if self.__pre_processors__:
if isinstance(self.__pre_processors__, Iterable):
processors = self.__pre_processors__
else:
processors = [self.__pre_processors__]
for processor in processors:
processor.process(descriptor, attachment_info)
# Updating the mutable dictionary
self.update([(k, v)
for k, v in attachment_info.items() if v is not None])
# Putting the file on the store.
self['length'] = self.get_store().put(self.path, descriptor)
self.timestamp = time.time()
store_manager = StoreManager.get_current_store_manager()
store_manager.register_to_delete_after_rollback(self)
if old_attachment:
store_manager.register_to_delete_after_commit(old_attachment)
self.pop('thumbnails', None)
except BaseException:
descriptor.close(check_length=False)
raise
else:
descriptor.close()
return self
[docs] def locate(self) -> str:
"""
Locates the file url.
"""
store = self.get_store()
return '%s?_ts=%s' % (store.locate(self), self.timestamp)
[docs] def get_objects_to_delete(self):
"""
Returns the files to be deleted, if the attachment is marked for deletion.
"""
yield self
[docs] def get_orphaned_objects(self):
"""
this method will be always called by the store when adding the ``self`` to the orphaned
list. so subclasses of the :class:`.Attachment` has a chance to add other related objects
into the orphaned list and schedule it for delete. for example the :class:`.Image` class
can schedule it's thumbnails for deletion also.
:return: An iterable of :class:`.Attachment` to mark as orphan.
.. versionadded:: 0.11.0
"""
return iter([])
[docs]class AttachmentCollection(object):
"""
Mixin to make a mutable iterator as a collection of :class:`.Attachment`.
"""
#: Type of items
__item_type__ = Attachment
@classmethod
def _listen_on_attribute(cls, attribute, coerce, parent_cls):
StoreManager.observe_attribute(attribute, collection=True)
super()._listen_on_attribute(attribute, coerce, parent_cls)
[docs]class AttachmentList(AttachmentCollection, MutableList):
"""
Used to create a collection of :class:`.Attachment`
::
class MyList(AttachmentList):
__item_type__ = MyAttachment
class Person(BaseModel):
__tablename__ = 'person'
id = Column(Integer, primary_key=True)
files = Column(MyList.as_mutable(Json))
me = Person()
me.files = MyList()
me.files.append(MyAttachment.create_from(any_file))
"""
[docs] def observe_item(self, item):
"""
A simple monkeypatch to instruct the children to notify the parent if
contents are changed:
From `sqlalchemy mutable documentation: <http://docs.sqlalchemy.org/en
/latest/orm/extensions/mutable.html#sqlalchemy.ext.mutable.MutableLis
t>`_
Note that MutableList does not apply mutable tracking to the values
themselves inside the list. Therefore it is not a sufficient
solution for the use case of tracking deep changes to a recursive
mutable structure, such as a JSON structure. To support this
use case, build a subclass of MutableList that provides appropriate
coercion to the values placed in the dictionary so that they too
are “mutable”, and emit events up to their parent structure.
:param item: The item to observe
:return:
"""
item = self.__item_type__.coerce(None, item)
item._parents = self._parents
return item
[docs] @classmethod
def coerce(cls, index, value):
if not isinstance(value, cls):
if isinstance(value, Iterable):
result = cls()
for i in value:
result.append(cls.__item_type__.coerce(index, i))
return result
return super().coerce(index, value)
else:
return value
[docs] def append(self, x):
super().append(self.observe_item(x))
StoreManager.get_current_store_manager().adopted(x)
[docs] def remove(self, i):
super().remove(i)
StoreManager.get_current_store_manager().orphaned(i)
[docs] def pop(self, *args):
i = super().pop(*args)
StoreManager.get_current_store_manager().orphaned(i)
return i
[docs] def extend(self, x):
StoreManager.get_current_store_manager().adopted(*x)
super().extend([self.observe_item(i) for i in x])
[docs] def insert(self, i, x):
StoreManager.get_current_store_manager().adopted(x)
super().insert(i, self.observe_item(x))
[docs] def clear(self):
StoreManager.get_current_store_manager().orphaned(*self)
super().clear()
def __delitem__(self, index):
StoreManager.get_current_store_manager().orphaned(self[index])
super().__delitem__(index)
def __setitem__(self, index, value):
store_manager = StoreManager.get_current_store_manager()
if isinstance(index, slice):
for old_value in self[index]:
if old_value:
store_manager.orphaned(old_value)
for new_item in value:
store_manager.adopted(new_item)
super().__setitem__(index, [self.observe_item(i) for i in value])
else:
old_value = self[index]
if old_value:
store_manager.orphaned(old_value)
value = self.observe_item(value)
store_manager.adopted(value)
super().__setitem__(index, value)
[docs]class AttachmentDict(AttachmentCollection, MutableDict):
"""
Used to create a dictionary of :class:`.Attachment`
::
class MyDict(AttachmentDict):
__item_type__ = MyAttachment
class Person(BaseModel):
__tablename__ = 'person'
id = Column(Integer, primary_key=True)
files = Column(MyDict.as_mutable(Json))
me = Person()
me.files = MyDict()
me.files['original'] = MyAttachment.create_from(any_file)
"""
[docs] @classmethod
def coerce(cls, index, value):
if not isinstance(value, cls):
if isinstance(value, dict) and not isinstance(value, Attachment):
result = cls()
for k, v in value.items():
result[k] = cls.__item_type__.coerce(k, v)
return result
return super().coerce(index, value)
else:
return value
[docs] def setdefault(self, key, value):
StoreManager.get_current_store_manager().adopted(value)
return super().setdefault(key, value)
[docs] def update(self, *a, **kw):
StoreManager.get_current_store_manager().adopted(kw.values())
super().update(*a, **kw)
[docs] def pop(self, *args):
i = super().pop(*args)
StoreManager.get_current_store_manager().orphaned(i)
return i
[docs] def popitem(self):
k, v = super().popitem()
StoreManager.get_current_store_manager().orphaned(v)
return k, v
[docs] def clear(self):
StoreManager.get_current_store_manager().orphaned(*self.values())
super().clear()
def __delitem__(self, key):
StoreManager.get_current_store_manager().orphaned(self[key])
super().__delitem__(key)
def __setitem__(self, key, value):
StoreManager.get_current_store_manager().adopted(value)
super().__setitem__(key, value)
[docs]class File(Attachment):
"""
Representing an attached file. Normally if you want to store any file,
this class is the best choice.
"""
__directory__ = 'files'
__prefix__ = 'file'
__max_length__ = 2 * MB
__min_length__ = 0
[docs]class FileList(AttachmentList):
"""
Equivalent to
::
class FileList(AttachmentList):
__item_type__ = File
"""
__item_type__ = File
[docs]class FileDict(AttachmentDict):
"""
Equivalent to
::
class FileDict(AttachmentDict):
__item_type__ = File
"""
__item_type__ = File
[docs]class BaseImage(File):
"""
Base class for all images.
"""
[docs] def attach(self, *args, dimension: Dimension = None, **kwargs):
"""
A new overload for :meth:`.Attachment.attach`, which accepts one
additional argument: ``dimension``.
:param args: The same as the: :meth:`.Attachment.attach`.
:param dimension: Image (width, height).
:param kwargs: The same as the: :meth:`.Attachment.attach`.
:return: The same as the: :meth:`.Attachment.attach`.
"""
if dimension:
kwargs['width'], kwargs['height'] = dimension
return super().attach(*args, **kwargs)
@property
def width(self):
return self.get('width')
@property
def height(self):
return self.get('height')
[docs]class Thumbnail(BaseImage):
"""
Representing an image thumbnail.
You may use :meth:`.generate_thumbnail` and or :meth:`.get_thumbnail`
with ``auto_generate=True`` to get one.
"""
__directory__ = 'thumbnails'
__prefix__ = 'thumbnail'
[docs]class Image(BaseImage):
"""
Equivalent to
::
class Image(Attachment):
__directory__ = 'images'
__prefix__ = 'image'
__max_length__ = 2 * MB
__min_length__ = 4 * KB
"""
__directory__ = 'images'
__prefix__ = 'image'
__max_length__ = 2 * MB
__min_length__ = 4 * KB
#: It allows to customize the type of thumbnail images.
__thumbnail_type__ = Thumbnail
@property
def thumbnails(self) -> List[Tuple[int, int, float, Thumbnail]]:
"""
A ``List[Tuple[int, int, float, Thumbnail]]``, to hold thumbnails.
You may use :meth:`.generate_thumbnail` and or :meth:`.get_thumbnail`
with ``auto_generate=True`` to fill it.
"""
return self.get('thumbnails')
[docs] def generate_thumbnail(
self,
width: int = None,
height: int = None,
ratio: float = None,
ratio_precision: int = 5) -> Thumbnail:
"""
.. versionadded:: 0.3
Generates and stores a thumbnail with the given arguments.
.. warning:: If none or more than one of the ``width``, ``height``
and or ``ratio`` are given, :exc:`ValueError` will be
raised.
:param width: The width of the thumbnail.
:param height: The Height of the thumbnail.
:param ratio: The coefficient to reduce, Must be less than ``1.0``.
:param ratio_precision: Number of digits after the decimal point of
the ``ratio`` argument to tune thumbnail
lookup precision. default: 2.
:return: the Newly generated :class:`.Thumbnail` instance.
"""
# Validating parameters
width, height, ratio = validate_width_height_ratio(
width, height, ratio
)
# opening the original file
thumbnail_buffer = io.BytesIO()
store = self.get_store()
format_ = 'jpeg'
extension = '.jpg'
with store.open(self.path) as original_file:
# generating thumbnail and storing in buffer
img = PilImage.open(original_file)
# JPEG does not handle Alpha channel, switch to PNG
if img.mode == 'RGBA':
format_ = 'png'
extension = '.png'
with img:
original_size = img.size
if callable(width):
width = width(original_size)
if callable(height):
height = height(original_size)
width = int(width)
height = int(height)
thumbnail_image = img.resize((width, height))
thumbnail_image.save(thumbnail_buffer, format_)
thumbnail_buffer.seek(0)
if self.thumbnails is None:
self['thumbnails'] = []
ratio = round(width / original_size[0], ratio_precision)
thumbnail = self.__thumbnail_type__.create_from(
thumbnail_buffer,
content_type=f'image/{format_}',
extension=extension,
dimension=(width, height)
)
self.thumbnails.append((width, height, ratio, thumbnail))
return thumbnail
[docs] def get_thumbnail(
self,
width: int = None,
height: int = None,
ratio: float = None,
ratio_precision: int = 2,
auto_generate: bool = False) -> Thumbnail:
"""
Search for the thumbnail with given arguments, if ``auto_generate``
is :data:`.False`, the :exc:`.ThumbnailIsNotAvailableError` will be
raised, else tries to call the :meth:`generate_thumbnail` to create a
new one.
:param width: Width of the thumbnail to search for.
:param height: Height of the thumbnail to search for.
:param ratio: Ratio of the thumbnail to search for.
:param ratio_precision: Number of digits after the decimal point of
the ``ratio`` argument to tune thumbnail
lookup precision. default: 2.
:param auto_generate: If :data:`.True`, tries to generate a new
thumbnail.
:return: :class:`.Thumbnail` instance.
.. warning:: if ``auto_generate`` is :data:`.True`, you have to commit
the session, to store the generated thumbnails.
"""
if ratio:
ratio = round(ratio, ratio_precision)
if self.thumbnails is not None:
for w, h, r, t in self.thumbnails:
if w == width or h == height or round(
r, ratio_precision) == ratio:
return self.__thumbnail_type__(t)
# thumbnail not found
if auto_generate:
return self.generate_thumbnail(width, height, ratio)
else:
raise ThumbnailIsNotAvailableError(
f'Thumbnail is not available with these criteria:'
f'width={width} height={height} ratio={ratio}'
)
[docs] def get_objects_to_delete(self):
"""
Returns the files to be deleted, if the attachment is marked for
deletion.
"""
yield from super().get_objects_to_delete()
if self.thumbnails:
for t in self.thumbnails:
yield self.__thumbnail_type__(**t[3])
[docs] def get_orphaned_objects(self):
"""
Mark thumbnails for deletion when the :class:`.Image` is being deleted.
:return: An iterable of :class:`.Thumbnail` to mark as orphan.
"""
if not self.thumbnails:
return
for t in self.thumbnails:
yield self.__thumbnail_type__(t[3])
[docs]class ImageList(AttachmentList):
"""
Used to create a collection of :class:`.Image`.
"""
__item_type__ = Image