Source code for panoptes_client.subject

from panoptes_client.subject_workflow_status import SubjectWorkflowStatus

_OLD_STR_TYPES = (str,)
try:
    _OLD_STR_TYPES = _OLD_STR_TYPES + (unicode,)
except NameError:
    pass

from builtins import range, str

import logging
import requests
import threading
import time

from copy import deepcopy
from concurrent.futures import ThreadPoolExecutor
import mimetypes

try:
    import magic
    MEDIA_TYPE_DETECTION = 'magic'
except ImportError:
    import importlib.metadata
    try:
        importlib.metadata.version("python-magic")
        logging.getLogger('panoptes_client').info(
            'libmagic not operational, likely due to lack of shared libraries. '
            'Media MIME type determination will be based on file extensions.'
        )
    except importlib.metadata.PackageNotFoundError:
        pass
    MEDIA_TYPE_DETECTION = 'mimetypes'

from panoptes_client.panoptes import (
    LinkResolver,
    ObjectNotSavedException,
    Panoptes,
    PanoptesAPIException,
    PanoptesObject,
)
from redo import retry

UPLOAD_RETRY_LIMIT = 5
RETRY_BACKOFF_INTERVAL = 5
ASYNC_SAVE_THREADS = 5

ALLOWED_MIME_TYPES = [
    "image/jpeg",
    "image/png",
    "image/gif",
    "image/svg+xml",
    "audio/mpeg",
    "video/mp4",
    "audio/mp4",
    "video/mpeg",
    "text/plain",
    "application/json",
]

[docs] class Subject(PanoptesObject): _api_slug = 'subjects' _link_slug = 'subjects' _edit_attributes = ( 'locations', 'metadata', { 'links': ( 'project', ), }, ) _local = threading.local()
[docs] @classmethod def async_saves(cls): """ Returns a context manager to allow asynchronously creating subjects or creating and uploading subject attached images/media. Using this context manager will create a pool of threads which will create multiple subjects at once and upload any local files simultaneously. The recommended way to use this is with the `with` statement:: with Subject.async_saves(): local_files = [...] for filename in local_files: s = Subject() s.links.project = 1234 s.add_location(filename) s.save() with Subject.async_saves(): local_files = [...] for filename in local_files: s = Subject(1234) s.save_attached_image(local_file) Alternatively, you can manually shut down the thread pool:: pool = Subject.async_saves() local_files = [...] try: for filename in local_files: s = Subject() s.links.project = 1234 s.add_location(filename) s.save() finally: pool.shutdown() """ cls._local.save_exec = ThreadPoolExecutor( max_workers=ASYNC_SAVE_THREADS ) return cls._local.save_exec
def __init__(self, raw={}, etag=None): super(Subject, self).__init__(raw, etag) if not self.locations: self.locations = [] if not self.metadata: self.metadata = {} self._original_metadata = {} self._media_files = [None] * len(self.locations)
[docs] def save(self, client=None): """ Like :py:meth:`.PanoptesObject.save`, but also uploads any local files which have previosly been added to the subject with :py:meth:`add_location`. Automatically retries uploads on error. If multiple local files are to be uploaded, several files will be uploaded simultaneously to save time. """ if not client: client = Panoptes.client() async_save = hasattr(self._local, 'save_exec') with client: if async_save: try: # The recursive call will exec in a new thread, so # self._local.save_exec will be undefined above self._async_future = self._local.save_exec.submit( self.save, client=client, ) return except RuntimeError: del self._local.save_exec async_save = False if not self.metadata == self._original_metadata: self.modified_attributes.add('metadata') response = retry( super(Subject, self).save, attempts=UPLOAD_RETRY_LIMIT, sleeptime=RETRY_BACKOFF_INTERVAL, retry_exceptions=(PanoptesAPIException,), log_args=False, ) if not response: return try: if async_save: upload_exec = self._local.save_exec else: upload_exec = ThreadPoolExecutor( max_workers=ASYNC_SAVE_THREADS, ) for location, media_data in zip( response['subjects'][0]['locations'], self._media_files ): if not media_data: continue for media_type, url in location.items(): upload_exec.submit( retry, self._upload_media, args=(url, media_data, media_type), attempts=UPLOAD_RETRY_LIMIT, sleeptime=RETRY_BACKOFF_INTERVAL, retry_exceptions=( requests.exceptions.RequestException, ), log_args=False, ) self._media_files = [None] * len(self.locations) finally: if not async_save: upload_exec.shutdown()
def _upload_media(self, url, media_data, media_type): upload_response = requests.put( url, headers={ 'Content-Type': media_type, 'x-ms-blob-type': 'BlockBlob', }, data=media_data, ) upload_response.raise_for_status() return upload_response def _detect_media_type(self, media_data=None, manual_mimetype=None): if manual_mimetype is not None: return manual_mimetype if MEDIA_TYPE_DETECTION == 'magic': return magic.from_buffer(media_data, mime=True) media_type = mimetypes.guess_type(media_data)[0] if not media_type: raise UnknownMediaException( 'Could not detect file type. Please try installing ' 'libmagic: https://panoptes-python-client.readthedocs.' 'io/en/latest/user_guide.html#uploading-non-image-' 'media-types' ) return media_type def _validate_media_type(self, media_type=None): if media_type not in ALLOWED_MIME_TYPES: raise UnknownMediaException(f"File type {media_type} is not allowed.") @property def async_save_result(self): """ Retrieves the result of this subject's asynchronous save. - Returns `True` if the subject was saved successfully. - Raises `concurrent.futures.CancelledError` if the save was cancelled. - If the save failed, raises the relevant exception. - Returns `False` if the subject hasn't finished saving or if the subject has not been queued for asynchronous save. """ if hasattr(self, "_async_future") and self._async_future.done(): self._async_future.result() return True else: return False @property def attached_images(self): """ A dict containing attached images/media of a subject. This should NOT be confused with subject locations. A subject_location is a media record that saves the location of the media that will be classified in a project's classifier. A subject_attached_image is a media record that serves as ancillary/auxiliary media to the subject and will be shown on a subject's Talk page. """ if self.id is None: raise ObjectNotSavedException return self.http_get('{}/attached_images'.format(self.id))[0]
[docs] def set_raw(self, raw, etag=None, loaded=True): super(Subject, self).set_raw(raw, etag, loaded) if loaded and self.metadata: self._original_metadata = deepcopy(self.metadata)
[docs] def subject_workflow_status(self, workflow_id): """ Returns SubjectWorkflowStatus of Subject in Workflow Example:: subject.subject_workflow_status(4321) """ return next(SubjectWorkflowStatus.where(subject_id=self.id, workflow_id=workflow_id))
[docs] def add_location(self, location, manual_mimetype=None): """ Add a media location to this subject. - **location** can be an open :py:class:`file` object, a path to a local file, or a :py:class:`dict` containing MIME types and URLs for remote media. - **manual_mimetype** optional, passes in a specific MIME type for media item. Examples:: subject.add_location(my_file) subject.add_location('/data/image.jpg') subject.add_location({'image/png': 'https://example.com/image.png'}) subject.add_location(my_file, manual_mimetype='image/png') """ if type(location) is dict: self.locations.append(location) self._media_files.append(None) self.modified_attributes.add('locations') return elif type(location) in (str,) + _OLD_STR_TYPES: f = open(location, 'rb') else: f = location try: media_data = f.read() media_type = self._detect_media_type(media_data, manual_mimetype) self._validate_media_type(media_type) self.locations.append(media_type) self._media_files.append(media_data) self.modified_attributes.add('locations') finally: f.close()
def _add_attached_image( self, src=None, content_type='image/png', external_link=True, metadata=None, client=None, ): if self.id is None: raise ObjectNotSavedException metadata = metadata or {} media_data = { 'content_type': content_type, 'external_link': external_link, 'metadata': metadata, } if src: media_data['src'] = src if not client: client = Panoptes.client() with client: json_response, _ = self.http_post('{}/attached_images'.format(self.id), json={'media': media_data}) return json_response['media'][0]['src'] def _save_attached_image(self, attached_media, manual_mimetype=None, metadata=None, client=None): if not client: client = Panoptes.client() with client: metadata = metadata or {} if type(attached_media) is dict: for content_type, url in attached_media.items(): self._add_attached_image( src=url, content_type=content_type, metadata=metadata, external_link=True, ) return elif type(attached_media) in (str,) + _OLD_STR_TYPES: f = open(attached_media, 'rb') else: f = attached_media media_type = None try: media_data = f.read() media_type = self._detect_media_type(media_data, manual_mimetype) self._validate_media_type(media_type) finally: f.close() file_url = self._add_attached_image( src=None, content_type=media_type, metadata=metadata, external_link=False, ) self._upload_media(file_url, media_data, media_type)
[docs] def save_attached_image( self, attached_media, manual_mimetype=None, metadata=None, client=None ): """ Add a attached_media to this subject. NOTE: This should NOT be confused with subject location. A subject location is the content of the subject that a volunteer will classify. A subject attached_media is ancillary data associated to the subject that get displayed on the Subject's Talk Page. - **attached_media** can be an open :py:class:`file` object, a path to a local file, or a :py:class:`dict` containing MIME types and URLs for remote media. - **manual_mimetype** optional string, passes in a specific MIME type for media item. - **metadata** can be a :py:class:`dict` that stores additional info on attached_media. - **client** optional Panoptes.client() instance. Sent as a parameter for threading purposes for parallelization so that thread uses the correct client context. Examples:: # Upload image by sending in a :py:class:`file` object subject.save_attached_image(my_file) # Upload local image by passing path to file subject.save_attached_image('/data/image.jpg') # Upload local image and set mimetype and record's metadata subject.save_attached_image(attached_media=my_file, manual_mimetype='image/jpg', metadata={'metadata_test': 'Object 1'}) # Upload externally hosted image subject.save_attached_image({"image/png": "https://example.com/test.png"}) We can utilize `async_saves` to upload/save attached_images in parallel. Examples:: from concurrent.futures import as_completed subject = Subject(1234) # list of file locations local_files = [...] with Subject.async_saves(): future_to_file = {subject.save_attached_image(file_location): file_location for file_location in local_files} for future in as_completed(future_to_file): local_file = future_to_file[future] try: future.result() except Exception as exc: print(f"Upload failed for {local_file}") """ if not client: client = Panoptes.client() async_save = hasattr(self._local, 'save_exec') future_result = None with client: metadata = metadata or {} try: if async_save: upload_exec = self._local.save_exec else: upload_exec = ThreadPoolExecutor(max_workers=ASYNC_SAVE_THREADS) future_result = upload_exec.submit( retry, self._save_attached_image, args=( attached_media, manual_mimetype, metadata, client ), attempts=UPLOAD_RETRY_LIMIT, sleeptime=RETRY_BACKOFF_INTERVAL, retry_exceptions=( requests.exceptions.RequestException ), log_args=False, ) finally: if not async_save: # Shuts down and waits for the task if this isn't being used in a `async_saves` block upload_exec.shutdown(wait=True) return future_result
[docs] class UnknownMediaException(Exception): pass
LinkResolver.register(Subject) LinkResolver.register(Subject, 'subject')