Source code for embypy.utils.connector

import json
import time
from requests.compat import urlparse, urlunparse, urlencode
import asyncio
import aiohttp
import datetime
from collections import deque
import websockets
import random
import ssl

from embypy import __version__
from embypy.utils.asyncio import async_func


class WebSocket:
    '''Basic websocet that runs function when messages are recived

    Parameters
    ----------
    conn : embypy.utils.Connector
      connector object
    url : str
      uri of websocet server
    ssl_str : str
      path to the ssl certificate for confirmation
    '''
    def __init__(self, conn, url, ssl_str=None):
        self.on_message = []
        self.url	= url
        self.conn	= conn
        if type(ssl_str) == str:
            self.ssl = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
            self.ssl.load_verify_locations(cafile=ssl_str)
        else:
            self.ssl = ssl_str

    def __setattr__(self, name, value):
        if name.endswith('_sync'):
            return self.__setattr__(name[:-5], value)
        super().__setattr__(name, value)

    def __getattr__(self, name):
        if name.endswith('_sync'):
            return self.__getattr__(name[:-5])
        return self.__getattribute__(name)

    def connect(self):
        '''Establish a connection'''
        # TODO - authenticate to emby
        #self.loop.create_task(self.handler())

    @async_func
    async def handler(self):
        '''Handle loop, get and process messages'''
        self.ws = await websockets.connect(self.url, ssl=self.ssl)
        while self.ws:
            message = await self.ws.recv()
            for handle in self.on_message:
                if asyncio.iscoroutinefunction(handle):
                    await handle(self, message)
                else:
                    handle(self, message)

    @async_func
    async def send(self, message):
        if not self.ws:
            return False
        return await self.ws.send(message)

    def close(self):
        '''close connection to socket'''
        self.ws.close()
        self.ws = None


[docs]class Connector: '''Class responsible for comunication with emby Parameters ---------- url : str url to connect to address-remote : str, optional alt url to connect to, pulic facing (see notes) ssl : str, optional path to ssl certificate - for self signed certs userid : str, optional emby id of the user you wish to connect as api-key : str api key generated by emby, used for authentication token : str similar to api key, but is meant for user logins username : str, optional username for login (see notes) password : str, optional password for login (see notes) device_id : str device id as registered in emby timeout : int number of seconds to wait before timeout for a request tries : int number of times to try a request before throwing an error jellyfin : bool if this is a jellyfin (false = emby) server Notes ----- This class/object should NOT be used (except internally). Tf a address-remote url is given, then that will be used for output, such as the `embypy.objects.EmbyObject.url` atribute. `url` will always be used when making requests - thus I recomend using the local address for `url` and the remote address for `address-remote` Jellyfin and emby have some url differences right now, so set jellyfin's url scheme to true/false [or None (default) for auto-detect] ''' def __init__(self, url, **kargs): try: asyncio.get_event_loop() except RuntimeError: asyncio.set_event_loop(asyncio.new_event_loop()) if ('api_key' not in kargs or 'userid' not in kargs) and \ ('username' not in kargs or 'password' not in kargs): raise ValueError( 'provide api key and userid or username/password' ) urlremote = kargs.get('address-remote') self.ssl = kargs.get('ssl', True) self.userid = kargs.get('userid') self.token = kargs.get('token') self.api_key = kargs.get('api_key', self.token) self.username = kargs.get('username') self.password = kargs.get('password') self.device_id = kargs.get('device_id', 'EmbyPy') self.timeout = kargs.get('timeout', 30) self.tries = kargs.get('tries', 3) self.jellyfin = kargs.get('jellyfin') self.url = urlparse(url) self.urlremote = urlparse(urlremote) if urlremote else urlremote self.attempt_login = False self._session_locks = {} self._session_uses = {} self._sessions = {} if self.ssl and type(self.ssl) == str: self.ssl = ssl.SSLContext(ssl.PROTOCOL_SSLv23) self.ssl.load_verify_locations(cafile=self.ssl) # connect to websocket is user wants to if 'ws' in kargs: self.ws = WebSocket(self, self.get_url(websocket=True), self.ssl) else: self.ws = None def __setattr__(self, name, value): if name.endswith('_sync'): return self.__setattr__(name[:-5], value) super().__setattr__(name, value) def __getattr__(self, name): if name.endswith('_sync'): return self.__getattr__(name[:-5]) return self.__getattribute__(name) async def _get_session(self): loop = asyncio.get_running_loop() loop_id = hash(loop) auth_header = 'MediaBrowser Client="{0}",Device="{0}",' \ 'DeviceId="{1}",Version="{2}"' auth_header = auth_header.format('EmbyPy', self.device_id, __version__) if self.token: auth_header += f',Token="{self.token}"' headers = { 'Authorization': auth_header, 'X-Emby-Authorization': auth_header, } if self.token: headers.update({'X-MediaBrowser-Token': self.token}) async with await self._get_session_lock(): session = self._sessions.get(loop_id) if not session: session = aiohttp.ClientSession( headers=headers, connector=aiohttp.TCPConnector(ssl_context=self.ssl), ) self._sessions[loop_id] = session self._session_uses[loop_id] = 1 else: self._session_uses[loop_id] += 1 return session async def _end_session(self): loop = asyncio.get_running_loop() loop_id = hash(loop) async with await self._get_session_lock(): self._session_uses[loop_id] -= 1 session = self._sessions.get(loop_id) if session and self._session_uses[loop_id] <= 0: await session.close() self._sessions[loop_id] = None async def _get_session_lock(self): loop = asyncio.get_running_loop() self._sessions[loop] = None return self._session_locks.setdefault(loop, asyncio.Lock()) @async_func async def info(self): return await self.getJson( '/system/info/public', remote=False, ) @property @async_func async def is_jellyfin(self): if self.jellyfin is None: self.jellyfin = False info = await self.info() try: ver = tuple(map(int, info.get('Version', '0.0.0').split('.'))) if len(ver) == 3 and ver[0] >= 10: self.jellyfin = True except ValueError: pass return self.jellyfin @async_func async def login_if_needed(self): # authenticate to emby if password was given if self.password and self.username and not self.token: return await self.login() @async_func async def login(self): if not self.username or self.attempt_login: return self.attempt_login = True try: data = await self.postJson( '/Users/AuthenticateByName', data={ 'Username': self.username, 'Pw': self.password, }, send_raw=True, format='json', ) self.token = data.get('AccessToken', '') self.userid = data.get('User', {}).get('Id') self.api_key = self.token session = await self._get_session() auth_header = session._default_headers['X-Emby-Authorization'] auth_header += f',Token="{self.token}"' session._default_headers['X-MediaBrowser-Token'] = self.token session._default_headers['Authorization'] = auth_header session._default_headers['X-Emby-Authorization'] = auth_header await self._end_session() finally: self.attempt_login = False
[docs] def get_url( self, path='/', websocket=False, remote=True, attach_api_key=True, userId=None, pass_uid=False, **query ): '''construct a url for an emby request Parameters ---------- path : str uri path(excluding domain and port) of get request for emby websocket : bool, optional if true, then `ws(s)` are used instead of `http(s)` remote : bool, optional if true, remote-address is used (default True) attach_api_key : bool, optional if true, apikey is added to the query (default True) userId : str, optional uid to use, if none, default is used pass_uid : bool, optional if true, uid is added to the query (default False) query : karg dict additional parameters to set (part of url after the `?`) Also See -------- get : getJson : post : delete : Returns ------- full url ''' userId = userId or self.userid if attach_api_key and self.api_key: query.update({ 'api_key': self.api_key, 'deviceId': self.device_id }) if pass_uid: query['userId'] = userId if remote: url = self.urlremote or self.url else: url = self.url if websocket: scheme = url.scheme.replace('http', 'ws') else: scheme = url.scheme url = urlunparse( (scheme, url.netloc, path, '', '{params}', '') ).format( UserId = userId, ApiKey = self.api_key, DeviceId = self.device_id, params = urlencode(query) ) return url[:-1] if url[-1] == '?' else url
@async_func async def _process_resp(self, resp): if (not resp or resp.status == 401) and self.username: await self.login() return False if not resp: return False if resp.status in (502, 503, 504): await asyncio.sleep(random.random()*4+0.2) return False return True @staticmethod @async_func async def resp_to_json(resp): try: return await resp.json() except aiohttp.client_exceptions.ContentTypeError: raise RuntimeError( 'Unexpected JSON output (status: {}): "{}"'.format( resp.status, await resp.text(), ) )
[docs] def add_on_message(self, func): '''add function that handles websocket messages''' return self.ws.on_message.append(func)
@async_func async def _req(self, method, path, params={}, **query): await self.login_if_needed() for i in range(self.tries): url = self.get_url(path, **query) try: resp = await method(url, timeout=self.timeout, **params) if await self._process_resp(resp): return resp await asyncio.sleep(random.random()*i + 0.2) except asyncio.exceptions.TimeoutError: pass except aiohttp.ClientConnectionError: pass raise aiohttp.ClientConnectionError( 'Emby server is probably down' ) @async_func async def get(self, path, **query): '''return a get request Parameters ---------- path : str same as get_url query : kargs dict additional info to pass to get_url See Also -------- get_url : getJson : Returns ------- requests.models.Response the response that was given ''' try: session = await self._get_session() async with await self._req( session.get, path, **query ) as resp: return resp.status, await resp.text() finally: await self._end_session() @async_func async def delete(self, path, **query): '''send a delete request Parameters ---------- path : str same as get_url query : kargs dict additional info to pass to get_url See Also -------- get_url : Returns ------- requests.models.Response the response that was given ''' try: session = await self._get_session() async with await self._req( session.delete, path, **query ) as resp: return resp.status finally: await self._end_session() @async_func async def post(self, path, data={}, send_raw=False, **query): '''sends post request Parameters ---------- path : str same as get_url data : dict post data to send send_raw : bool if true send data as post data, otherwise send as a json string query : kargs dict additional info to pass to get_url See Also -------- postJson : get_url : Returns ------- requests.models.Response the response that was given ''' return await self._post( path, return_json=False, data=data, send_raw=send_raw, **query, ) @async_func async def postJson(self, path, data={}, send_raw=False, **query): '''sends post request Parameters ---------- path : str same as get_url data : dict post data to send send_raw : bool if true send data as post data, otherwise send as a json string query : kargs dict additional info to pass to get_url See Also -------- post : get_url : Returns ------- requests.models.Response the response that was given ''' return await self._post( path, return_json=True, data=data, send_raw=send_raw, **query, ) @async_func async def _post(self, path, return_json, data, send_raw, **query): '''sends post request Parameters ---------- path : str same as get_url query : kargs dict additional info to pass to get_url See Also -------- get_url : Returns ------- requests.models.Response the response that was given ''' try: session = await self._get_session() if send_raw: params = {"json": data} else: params = {"data": json.dumps(data)} async with await self._req( session.post, path, params=params, **query ) as resp: if return_json: return await Connector.resp_to_json(resp) else: return resp.status, await resp.text() finally: await self._end_session() @async_func async def getJson(self, path, **query): '''wrapper for get, parses response as json Parameters ---------- path : str same as get_url query : kargs dict additional info to pass to get_url See Also -------- get_url : get : Returns ------- dict the response content as a dict ''' try: session = await self._get_session() async with await self._req( session.get, path, **query ) as resp: return await Connector.resp_to_json(resp) finally: await self._end_session()