from embypy.utils.asyncio import async_func
import arrow
import datetime
_EMPTY_OBJ = {
"Id": "",
"Name": "",
"OriginalTitle": "",
"ForcedSortName": "",
"CommunityRating": "",
"CriticRating": "",
"IndexNumber": "",
"AirsBeforeSeasonNumber": "",
"AirsAfterSeasonNumber": "",
"AirsBeforeEpisodeNumber": "",
"ParentIndexNumber": None,
"DisplayOrder": "",
"Album": "",
"AlbumArtists": [],
"ArtistItems": [],
"Overview": "",
"Status": "",
"AirDays": [],
"AirTime": "",
"Genres": [],
"Tags": [],
"Studios": [],
"PremiereDate": "",
"DateCreated": "",
"EndDate": None,
"ProductionYear": "",
"AspectRatio": "",
"Video3DFormat": "",
"OfficialRating": "",
"CustomRating": "",
"People": [],
"LockData": False,
"LockedFields": [],
"ProviderIds": {
"MusicBrainzReleaseGroup": "",
"MusicBrainzAlbumArtist": "",
"MusicBrainzAlbum": "",
"MusicBrainzArtist": "",
"MusicBrainzTrack": "",
"AudioDbAlbum": "",
"AudioDbArtist": ""
},
"PreferredMetadataLanguage": "",
"PreferredMetadataCountryCode": "",
"Taglines": []
}
[docs]class EmbyObject(object):
'''Deafult EMby Object Template
Parameters
----------
object_dict : dict
dictionary with json info returned from emby
connector: embypy.utils.connector.Connector
connector object to make upstream api calls
save : bool
if true, append to list of existing objects
saves space/increases speed/reduces issues
only set to false if creating a temp object that will be thrown out
'''
known_objects = {}
def __init__(self, object_dict, connector, save=True):
self.connector = connector
self.object_dict = object_dict
self.extras = {}
if save:
EmbyObject.known_objects[object_dict.get('Id')] = self
def __eq__(self, other):
return isinstance(other, EmbyObject) and self.id == other.id
def __setattr__(self, name, value):
if name.endswith('_sync'):
return self.__setattr__(name[:-5], value)
super().__setattr__(name, value)
def __getattr__(self, name):
if name.endswith('_sync'):
return self.__getattr__(name[:-5])
return self.__getattribute__(name)
@property
def id(self):
'''string with hexidecimal hash representing the id of this
object in emby
'''
return self.object_dict.get('Id') or self.object_dict.get('ItemId')
@property
def name(self):
'''name of the item
See Also
--------
post :
'''
return self.object_dict.get('Name', '')
@name.setter
def name(self, value):
self.object_dict['Name'] = value
@property
def title(self):
'''same as name
See Also
--------
post :
'''
return self.name
@title.setter
def title(self, value):
self.name = value
@property
def path(self):
'''get the filepath of the media file (not url)
See Also
--------
url :
'''
return self.object_dict.get('Path', '')
@property
def watched(self):
'''returns True it item has been watched'''
return self.object_dict.get('UserData', {}).get('Played')
@property
def played(self):
'''same as `watched`'''
return self.watched
@property
def percentage_played(self):
'''returns played percentage [0,1] of item'''
played = self.object_dict.get(
'UserData', {}
).get('PlaybackPositionTicks')
total = self.object_dict.get('RunTimeTicks') or played or 1
return (played or 0) / total
@property
def duration(self):
'''returns duration of item in seconds'''
return self.object_dict.get('RunTimeTicks', 0) / (10**7)
@property
def play_count(self):
'''returns users playcount for item'''
return self.object_dict.get('UserData', {}).get('PlayCount', 0)
@property
def favorite(self):
'''returns True if user favorited item'''
return self.object_dict.get('UserData', {}).get('IsFavorite', False)
@async_func
async def _mark(self, type, value):
url = '/Users/{{UserId}}/{type}/{id}'.format(type=type, id=self.id)
if value:
(await self.connector.post(url)).close()
else:
(await self.connector.delete(url)).close()
@async_func
async def setFavorite(self, value=True):
await self._mark('FavoriteItems', value)
@async_func
async def setWatched(self, value=True):
await self._mark('PlayedItems', value)
@property
def type(self):
'''get the object type (general)
See Also
--------
media_type :
'''
return self.object_dict.get('Type', 'Object')
@property
def media_type(self):
'''get the object type (specific)
See Also
--------
type :
'''
return self.object_dict.get('MediaType', 'Object')
@property
def genres(self):
'''list of genres
See Also
--------
post :
tags :
'''
return self.object_dict.get('Genres', [])
@genres.setter
def genres(self, genres: list):
self.object_dict['Genres'] = genres
@property
def tags(self):
'''list of tags
See Also
--------
post :
genres :
'''
return self.object_dict.get('Tags', [])
@tags.setter
def tags(self, tags: list):
self.object_dict['Tags'] = tags
@property
def overview(self):
'''the description of the item
See Also
--------
post :
'''
return self.object_dict.get('Overview', '')
@overview.setter
def overview(self, value):
self.object_dict['Overview'] = value
@property
def community_rating(self):
'''int [0-10] with the rating of the item
See Also
--------
post :
'''
return self.object_dict.get('CommunityRating', 0)
@community_rating.setter
def community_rating(self, value):
self.object_dict['CommunityRating'] = value
@property
def primary_image_url(self):
'''url of the main poster image'''
path = '/Items/{}/Images/Primary'.format(self.id)
return self.connector.get_url(path, attach_api_key=False)
@property
def date(self):
"""alias of premier_date"""
return self.premier_date
@date.setter
def date(self, value):
self.premier_date = value
@property
def premier_date(self):
"""datetime of when the item premiered (aired/released) (or None)"""
ts = self.object_dict.get('PremiereDate')
if not ts:
return None
return arrow.get(ts).datetime
@premier_date.setter
def premier_date(self, value):
if isinstance(value, datetime.datetime):
value = value.strftime("%Y-%m-%dT%H:%M:%SZ")
elif not isinstance(value, str):
raise ValueError('value must be datetime or str')
self.object_dict['PremiereDate'] = value
@property
def date_created(self):
"""datetime of when the item was added to the server (or None)"""
ts = self.object_dict.get('DateCreated')
if not ts:
return None
return arrow.get(ts).datetime
@date_created.setter
def date_created(self, value):
if isinstance(value, datetime.datetime):
value = value.strftime("%Y-%m-%dT%H:%M:%SZ")
elif not isinstance(value, str):
raise ValueError('value must be datetime or str')
self.object_dict['DateCreated'] = value
@property
def parent_id(self):
'''id of the parent object
See Also
--------
parent :
'''
return self.object_dict.get('ParentId')
@property
@async_func
async def parent(self):
'''parent object as a subclass of EmbyObject
|coro|
'''
if self.parent_id:
return await self.process(self.parent_id)
else:
return None
@property
def download_url(self):
return self.connector.get_url('/Items/{}/Download'.format(self.id))
@property
@async_func
async def url(self):
'''url of the item
|coro|
Notes
-----
if remote-adderes was given, then that is used as the base
'''
if await self.connector.is_jellyfin:
path = '/web/index.html#!/details?id={}'
else:
path = '/web/itemdetails.html?id={}'
path = path.format(self.id)
return self.connector.get_url(path, attach_api_key=False)
@async_func
async def update(self, fields=''):
'''reload object info from emby
|coro|
Parameters
----------
fields : str
additional fields to request when updating
See Also
--------
refresh : same thing
send :
post :
'''
path = 'Users/{{UserId}}/Items/{}'.format(self.id)
info = await self.connector.getJson(
path,
remote=False,
Fields='Path,Overview,PremiereDate'+(',' if fields else '')+fields
)
self.object_dict.update(info)
self.extras = {}
return self
@async_func
async def refresh(self, fields=''):
'''Same as update
|coro|
See Also
--------
update :
'''
return await self.update()
@async_func
async def send(self):
'''send data that was changed to emby
|coro|
This should be used after using any of the setter.
Not necessarily immediately, but soon after.
See Also
--------
post: same thing
update :
refresh :
'''
# Why does the whole dict need to be sent?
# because emby is dumb, and will break if I don't
data = {**_EMPTY_OBJ, **self.object_dict}
path = 'Items/{}'.format(self.id)
status, resp = await self.connector.post(
path,
data=data,
remote=False,
send_raw=True,
headers={'Content-Type': 'application/json'},
)
if status in (400, 415):
await EmbyObject(self.object_dict, self.connector).update()
status, resp = await self.connector.post(
path,
data=data,
remote=False,
send_raw=False,
headers={'Content-Type': 'application/json'},
)
return status, resp
@async_func
async def post(self):
'''Same as send
|coro|
See Also
--------
send :
'''
return await self.send()
@async_func
async def process(self, object_dict):
'''[for internal use] convert json/dict into python object
|coro|
Parameters
----------
object_dict : dict
json representation of object from emby
Notes
-----
if a string is given, it is assumed to be an id, obj is returned.
if a list is given, this method is called for each item in list.
Returns
-------
EmbyObject
the object that is represented by the json dict
list
if input is a list, list is returned
'''
# if ID was given, create dummy object
# and update it to get full dict
try:
if type(object_dict) == str:
existing = EmbyObject.known_objects.get(object_dict)
if existing:
return existing
obj = EmbyObject(
{"Id": object_dict},
self.connector,
save=False
)
object_dict = (await obj.update()).object_dict
except:
return None
# if nothing was given, return it back
# if already created object was given, return it back too
if not object_dict or isinstance(object_dict, EmbyObject):
return object_dict
# if a json dict that's really just a list was given,
# convert to list
if type(object_dict) == dict and \
set(object_dict.keys()).issuperset({'Items', 'TotalRecordCount'}):
object_dict = object_dict['Items']
# if a list was given,
# process each item in list
if type(object_dict) == list:
items = []
for item in object_dict:
item = await self.process(item)
if item:
items.append(item)
return items
# otherwise we probably have an object dict
# so we should process that
# if dict has no id, it's a fake
if 'Id' not in object_dict and 'ItemId' not in object_dict:
return object_dict
# if object is already stored,
# update with existing info and return
itemId = object_dict.get('Id', object_dict.get('ItemId'))
existing = EmbyObject.known_objects.get(itemId)
if existing:
existing.object_dict.update(object_dict)
return existing
import embypy.objects.folders
import embypy.objects.videos
import embypy.objects.misc
# if object is not already stored,
# figure out its type (if unknown use this base class)
# create an object with subclass of that type
# return
if 'AppName' in object_dict:
object_dict['Type'] = 'Device'
elif 'HasPassword' in object_dict:
object_dict['Type'] = 'User'
objects = {
'Audio': embypy.objects.misc.Audio,
'Person': embypy.objects.misc.Person,
'Video': embypy.objects.videos.Video,
'Movie': embypy.objects.videos.Movie,
'Trailer': embypy.objects.videos.Trailer,
'AdultVideo': embypy.objects.videos.AdultVideo,
'MusicVideo': embypy.objects.videos.MusicVideo,
'Episode': embypy.objects.videos.Episode,
'Folder': embypy.objects.folders.Folder,
'Playlist': embypy.objects.folders.Playlist,
'BoxSet': embypy.objects.folders.BoxSet,
'MusicAlbum': embypy.objects.folders.MusicAlbum,
'MusicArtist': embypy.objects.folders.MusicArtist,
'Season': embypy.objects.folders.Season,
'Series': embypy.objects.folders.Series,
'Game': embypy.objects.misc.Game,
'GameSystem': embypy.objects.folders.GameSystem,
'Photo': embypy.objects.misc.Photo,
'Book': embypy.objects.misc.Book,
'Image': embypy.objects.misc.Image,
'Device': embypy.objects.misc.Device,
'User': embypy.objects.misc.User,
'Default': EmbyObject,
}
return objects.get(
object_dict.get('Type', 'Default'),
EmbyObject
)(object_dict, self.connector)
def __str__(self):
return self.name
def __repr__(self):
return '<{} {}>'.format(self.type, self.id)
@property
def provider_ids(self):
res = self.object_dict.get("ProviderIds", {})
return res
@property
def tmdbid(self):
return self.provider_ids.get("Tmdb")
@property
def imdbid(self):
return self.provider_ids.get("Imdb")