Source code for kcrw.apple_news.api

"""Library for making Apple News API requests"""
import base64
import hashlib
import hmac
import json
import logging
import os
import random
import requests
import six
from datetime import datetime


logger = logging.getLogger('kcrw.apple_news' + __name__)

EXT_MAPPING = {
    '.json': b'application/json',
    '.jpg': b'image/jpeg',
    '.jpeg': b'image/jpeg',
    '.gif': b'image/gif',
    '.png': b'image/png',
}


[docs]def ensure_binary(s, encoding='utf8'): if isinstance(s, six.text_type): s = s.encode(encoding) return s
[docs]def ensure_text(s, encoding='utf8'): if isinstance(s, six.binary_type): s = s.decode(encoding) return s
[docs]class AppleNewsError(Exception): """Exception class for errors related to Apple News API requests""" code = None data = None
[docs] def __init__(self, *args, **kw): super(AppleNewsError, self).__init__(*args) if 'code' in kw: self.code = kw['code'] if 'data' in kw: self.data = kw['data']
[docs]class API(object): """Apple News API object""" key_id = None key_secret = None channel_id = None url_base = 'https://news-api.apple.com'
[docs] def __init__(self, key_id, key_secret, channel_id): """ :param key_id: Apple News API Key Id :type key_id: str :param key_secret: Apple News API Secret :type key_secret: str :param channel_id: Apple News Channel Id :type channel_id: str """ self.key_id = key_id self.key_secret = key_secret self.channel_id = channel_id
[docs] def send_request(self, method, route, body=None, content_type=None): """Sends a signed request to the Apple News Publisher API. :param method: The HTTP method for the request (e.g. GET, POST, ...) :type method: str :param route: The API route for the request :type route: str :param body: The request body :type body: str :param content_type: The request content type :type content_type: str :returns: dict -- the JSON data returned by the API. :raises: AppleNewsException """ date = datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%SZ') url = self.url_base route = route.rstrip('/') if route: url = url + '/' + route canonical_request = ( ensure_binary(method, 'utf8') + ensure_binary(url, 'utf8') + ensure_binary(date, 'utf8') ) if body: canonical_request += ( ensure_binary(content_type, 'utf8') + ensure_binary(body, 'utf8') ) signature = self._create_signature(canonical_request) authorization = "HHMAC; key={}; signature={}; date={}".format( self.key_id, signature, date ) headers = {"Authorization": authorization} if body: headers["Content-Type"] = content_type resp = data = code = reason = None try: resp = requests.request(method, url, headers=headers, data=body) resp.raise_for_status() except requests.exceptions.RequestException: logger.exception('Requests error') if resp is not None: data = resp.json() code = resp.status_code reason = resp.reason raise AppleNewsError( 'Error during Apple News request to {} ({}: {})'.format( url, code, reason ), code=code, data=data ) if method != 'DELETE': return resp.json() else: return {'result': 'Deleted item at url: {}'.format(url)}
def _create_signature(self, canonical_request): key_bytes = base64.b64decode(self.key_secret) message = canonical_request signature = base64.b64encode( hmac.new(key_bytes, message, digestmod=hashlib.sha256).digest() ) return signature
[docs] def read_channel(self): """Read basic information about the current Apple News Channel :returns: dict -- the JSON data returned by the API. :raises: AppleNewsException """ method = "GET" route = 'channels/' + self.channel_id return self.send_request(method, route)
[docs] def create_article(self, article, metadata=None, assets=None): """Creates and uploads a multi-part article. See `Create an Article`_ ``article`` and ``metadata`` should contain JSON-serializable dictionaries according the API specification for `article data`_ and optional `article creation metadata`_. ``assets`` should contain a dict mapping of filenames to file data for all supplemental assets needed for the article. :param article: Article data in JSON serializable python dict :type article: dict :param metadata: Optional metadata in JSON serializable python dict :type article: dict -- or None :param assets: Dict with filename -> data mapping of additional assets :type assets: dict -- or None :returns: dict -- the JSON data returned by the API. :raises: AppleNewsException .. _Create an Article: https://developer.apple.com/documentation/apple_news/create_an_article .. _article data: https://developer.apple.com/documentation/apple_news/articledocument .. _article creation metadata: https://developer.apple.com/documentation/apple_news/create_article_metadata_fields """ # noqa: E501 if not article: raise AppleNewsError('No article body found for article') if assets: files = list(sorted(assets.items(), key=lambda e: ensure_text(e[0], 'utf8'))) else: files = [] files.insert(0, ('article.json', json.dumps(article))) if metadata: files.insert(0, ('metadata', json.dumps(metadata))) body, content_type = self._build_article_body(files) method = "POST" route = 'channels/{}/articles'.format(self.channel_id) return self.send_request(method, route, body, content_type)
[docs] def update_article(self, identifier, metadata, article=None, assets=None): """Updates an existing a article. See `Update an Article`_. Requires an ``identifier`` for an existing article, and `metadata`_ containing the current article ``revision`` identifier. All other arguments are optional. :param identifier: An identifier for an existing Apple News article :type identifier: str :param metadata: JSON formatted article metadata :type metadata: dict :param article: Article data in JSON serializable python dict :type article: dict -- or None :param assets: Dict with filename -> data mapping of additional assets :type assets: dict -- or None :returns: dict -- the JSON data returned by the API. :raises: AppleNewsException .. _Update an Article: https://developer.apple.com/documentation/apple_news/update_an_article .. _metadata: https://developer.apple.com/documentation/apple_news/metadata """ # noqa: E501 if not metadata or 'revision' not in metadata.get('data', {}): raise AppleNewsError( 'No valid metadata data found for article update' ) if assets: files = list(sorted(assets.items(), key=lambda e: ensure_text(e[0], 'utf8'))) else: files = [] files.insert(0, ('metadata', json.dumps(metadata))) if article: files.insert(1, ('article.json', json.dumps(article))) body, content_type = self._build_article_body(files) method = "POST" path = "articles/{}".format(identifier) return self.send_request(method, path, body, content_type)
[docs] def read_article(self, identifier): """Retrieves information about an existing a article. See `Read Article Information`_. Requires an ``identifier`` for an existing article. :param identifier: An identifier for an existing Apple News article :type identifier: str :returns: dict -- the JSON data returned by the API. :raises: AppleNewsException .. _Read Article Information: https://developer.apple.com/documentation/apple_news/read_article_information """ # noqa: E501 method = "GET" path = "articles/{}".format(identifier) return self.send_request(method, path)
[docs] def delete_article(self, identifier): """Deletes an existing a article. See `Delete an Article`_. Requires an ``identifier`` for an existing article. :param identifier: An identifier for an existing Apple News article :type identifier: str :returns: dict -- the JSON data returned by the API. :raises: AppleNewsException .. _Delete an Article: https://developer.apple.com/documentation/apple_news/delete_an_article """ # noqa: E501 method = "DELETE" path = "articles/{}".format(identifier) return self.send_request(method, path)
def _build_article_body(self, files): boundary = six.text_type(random.getrandbits(64)) boundary = boundary.encode('utf8') content_type = b"multipart/form-data; boundary=%s" % boundary parts = filter( None, map(lambda f: self._build_mime_part(boundary, *f), files) ) body = b"\r\n".join(parts) body += b"\r\n--%s--" % boundary return body, content_type def _build_mime_part(self, boundary, filename, file_data): content_type = self._guess_content_type(filename) if content_type is None: return None filename = ensure_binary(filename, 'utf8') file_data = ensure_binary(file_data, 'utf8') content_type = content_type part = b"--%s\r\n" % boundary part += b"Content-Type: %s\r\n" % content_type f_size = len(file_data) part += b"Content-Disposition: form-data; filename=%s; size=%d\r\n\r\n" % ( filename, f_size ) part += file_data return part @staticmethod def _guess_content_type(filename): extension = os.path.splitext(filename)[-1].lower() if filename == 'metadata': return b"application/json" return EXT_MAPPING.get(extension, None)