Source code for mongorest.middlewares

# -*- encoding: UTF-8 -*-
from __future__ import absolute_import, unicode_literals

from pydoc import locate
from werkzeug.contrib.sessions import SessionStore
from werkzeug.utils import dump_cookie, parse_cookie

from .collection import Collection
from .settings import settings
from .utils import deserialize

__all__ = [
    'AuthenticationMiddleware',
    'CORSMiddleware',
]


[docs]class AuthenticationMiddleware(object): """ Authentication Middleware works with Cookie and Token Authentication """ def __init__(self, app): self.app = app def __call__(self, environ, start_response): session_store = locate(settings.SESSION_STORE) if not session_store or not issubclass(session_store, SessionStore): raise ValueError( 'SESSION_STORE must be a sub class of \'SessionStore\'' ) session_store = session_store() auth_collection = locate(settings.AUTH_COLLECTION) if not auth_collection or not issubclass(auth_collection, Collection): raise ValueError( 'AUTH_COLLECTION must be a sub class of \'Collection\'' ) environ['session'] = session_store.new() session_id = environ.get('HTTP_AUTHORIZATION', '') if len(session_id.split('Token ')) == 2: session_id = session_id.split('Token ')[1] environ['session'] = session_store.get(session_id) else: cookies = environ.get('HTTP_COOKIE') if cookies: session_id = parse_cookie(cookies).get('session_id') if session_id: environ['session'] = session_store.get(session_id) environ[auth_collection.__name__.lower()] = auth_collection.get({ '_id': deserialize( environ['session'].get(auth_collection.__name__.lower(), '""') ) }) def authentication(status, headers, exc_info=None): if environ.get(auth_collection.__name__.lower()): headers.extend([ ( 'Set-Cookie', dump_cookie( 'session_id', environ['session'].sid, 30 * 24 * 60 * 60, ) ), ( 'HTTP_AUTHORIZATION', 'Token {0}'.format( environ['session'].sid ) ), ]) return start_response(status, headers, exc_info) response = self.app(environ, authentication) if environ['session'].should_save: session_store.save(environ['session']) return response
[docs]class CORSMiddleware(object): """ CORS Middleware used to overcome CORS restrictions """ def __init__(self, app): self.app = app def __call__(self, environ, start_response): def cors(status, headers, exc_info=None): headers.extend([ ( 'Access-Control-Allow-Origin', settings.CORS['Access-Control-Allow-Origin'] ), ( 'Access-Control-Allow-Methods', settings.CORS['Access-Control-Allow-Methods'] ), ( 'Access-Control-Allow-Headers', settings.CORS['Access-Control-Allow-Headers'] ), ( 'Access-Control-Allow-Credentials', settings.CORS['Access-Control-Allow-Credentials'] ) ]) return start_response(status, headers, exc_info) if environ.get('REQUEST_METHOD') == 'OPTIONS': cors('200 OK', [('Content-Type', 'text/plain')]) return [b'200 OK'] return self.app(environ, cors)