Source code for sqlalchemy_media.stores.os2

from io import BytesIO

# Importing optional stuff required by http based store
try:
    import requests
except ImportError:  # pragma: no cover
    requests = None


# Importing optional stuff required by OS2 store
try:
    from aliyunauth import OssAuth as OS2Auth
except ImportError:  # pragma: no cover
    OS2Auth = None


from sqlalchemy_media.exceptions import OS2Error
from sqlalchemy_media.optionals import ensure_os2auth
from sqlalchemy_media.typing_ import FileLike
from .base import Store


[docs]class OS2Store(Store): """ Store for dealing with oss of aliyun """ base_url = 'https://{0}.oss-{1}.aliyuncs.com' DEFAULT_MAX_AGE = 60 * 60 * 24 * 365 def __init__(self, bucket: str, access_key: str, secret_key: str, region: str, max_age: int = DEFAULT_MAX_AGE, base_headers: dict = None, prefix: str = None, base_url: str = None, cdn_url: str = None, cdn_prefix_ignore: bool = False, acl: str = 'private'): self.bucket = bucket self.access_key = access_key self.secret_key = secret_key self.region = region self.max_age = max_age self.prefix = prefix self.acl = acl self.base_headers = base_headers or {} if base_url: self.base_url = base_url else: self.base_url = self.base_url.format(bucket, region) if prefix: self.base_url = '%s/%s' % (self.base_url, prefix) if cdn_url and not cdn_prefix_ignore: cdn_url = '%s/%s' % (cdn_url, prefix) if self.base_url.endswith('/'): self.base_url = self.base_url.rstrip('/') if cdn_url and cdn_url.endswith('/'): cdn_url = cdn_url.rstrip('/') self.cdn_url = cdn_url def _get_os2_url(self, filename: str): return '{0}/{1}'.format(self.base_url, filename) def _upload_file(self, url: str, data: str, content_type: str): ensure_os2auth() auth = OS2Auth(self.bucket, self.access_key, self.secret_key) headers = self.base_headers.copy() headers.update({ 'Cache-Control': 'max-age=' + str(self.max_age), 'x-oss-object-acl': self.acl }) if content_type: headers['Content-Type'] = content_type res = requests.put(url, auth=auth, data=data, headers=headers) if not 200 <= res.status_code < 300: raise OS2Error(res.text)
[docs] def put(self, filename: str, stream: FileLike): url = self._get_os2_url(filename) data = stream.read() content_type = getattr(stream, 'content_type', None) self._upload_file(url, data, content_type) return len(data)
[docs] def delete(self, filename: str): ensure_os2auth() url = self._get_os2_url(filename) auth = OS2Auth(self.bucket, self.access_key, self.secret_key) headers = self.base_headers.copy() res = requests.delete(url, auth=auth, headers=headers) if not 200 <= res.status_code < 300: raise OS2Error(res.text)
[docs] def open(self, filename: str, mode: str = 'rb') -> FileLike: ensure_os2auth() url = self._get_os2_url(filename) auth = OS2Auth(self.bucket, self.access_key, self.secret_key) headers = self.base_headers.copy() res = requests.get(url, auth=auth, headers=headers) if not 200 <= res.status_code < 300: raise OS2Error(res.text) return BytesIO(res.content)
[docs] def locate(self, attachment) -> str: if self.cdn_url: base_url = self.cdn_url else: base_url = self.base_url return '%s/%s' % (base_url, attachment.path)