From bb7ba68228615a1e81eb95a14d80434f071ba9ba Mon Sep 17 00:00:00 2001 From: Carl Furrow Date: Sat, 9 Dec 2023 14:31:36 -0500 Subject: [PATCH] update: bring in upstream changes from pyicloud --- src/icloudpd/authentication.py | 2 +- src/icloudpd/base.py | 4 +- src/pyicloud_ipd/__init__.py | 1 + src/pyicloud_ipd/base.py | 20 +- src/pyicloud_ipd/cmdline.py | 197 +++-- src/pyicloud_ipd/exceptions.py | 45 +- src/pyicloud_ipd/services/__init__.py | 2 + src/pyicloud_ipd/services/account.py | 337 ++++++++- src/pyicloud_ipd/services/calendar.py | 64 +- src/pyicloud_ipd/services/contacts.py | 56 +- src/pyicloud_ipd/services/drive.py | 362 +++++++++ src/pyicloud_ipd/services/findmyiphone.py | 169 ++--- src/pyicloud_ipd/services/photos.py | 850 ++++++++++------------ src/pyicloud_ipd/services/reminders.py | 174 ++--- src/pyicloud_ipd/services/ubiquity.py | 120 ++- src/pyicloud_ipd/utils.py | 28 +- 16 files changed, 1517 insertions(+), 914 deletions(-) create mode 100644 src/pyicloud_ipd/services/drive.py diff --git a/src/icloudpd/authentication.py b/src/icloudpd/authentication.py index 861b03af0..b0bcf321c 100644 --- a/src/icloudpd/authentication.py +++ b/src/icloudpd/authentication.py @@ -34,7 +34,7 @@ def authenticate_( client_id=client_id, ) break - except pyicloud_ipd.exceptions.NoStoredPasswordAvailable: + except pyicloud_ipd.exceptions.PyiCloudNoStoredPasswordAvailableException: # Prompt for password if not stored in PyiCloud's keyring password = click.prompt("iCloud Password", hide_input=True) diff --git a/src/icloudpd/base.py b/src/icloudpd/base.py index bf5c299b0..ca09d6adf 100644 --- a/src/icloudpd/base.py +++ b/src/icloudpd/base.py @@ -19,7 +19,7 @@ from tzlocal import get_localzone from pyicloud_ipd import PyiCloudService -from pyicloud_ipd.exceptions import PyiCloudAPIResponseError +from pyicloud_ipd.exceptions import PyiCloudAPIResponseException from pyicloud_ipd.services.photos import PhotoAsset from icloudpd.authentication import authenticator, TwoStepAuthRequiredError @@ -788,7 +788,7 @@ def core( logger.error("Unknown library: %s", library) return 1 photos = library_object.albums[album] - except PyiCloudAPIResponseError as err: + except PyiCloudAPIResponseException as err: # For later: come up with a nicer message to the user. For now take the # exception text logger.error("error?? %s", err) diff --git a/src/pyicloud_ipd/__init__.py b/src/pyicloud_ipd/__init__.py index b65b1f9bb..dd84cc538 100644 --- a/src/pyicloud_ipd/__init__.py +++ b/src/pyicloud_ipd/__init__.py @@ -1,3 +1,4 @@ +"""The pyiCloud library.""" import logging from pyicloud_ipd.base import PyiCloudService diff --git a/src/pyicloud_ipd/base.py b/src/pyicloud_ipd/base.py index e53f6f502..78e2d08ee 100644 --- a/src/pyicloud_ipd/base.py +++ b/src/pyicloud_ipd/base.py @@ -1,3 +1,4 @@ +"""Library base file.""" from uuid import uuid1 import inspect import json @@ -10,11 +11,10 @@ import getpass from pyicloud_ipd.exceptions import ( - PyiCloudConnectionException, PyiCloudFailedLoginException, - PyiCloudAPIResponseError, - PyiCloud2SARequiredError, - PyiCloudServiceNotActivatedErrror + PyiCloudAPIResponseException, + PyiCloud2SARequiredException, + PyiCloudServiceNotActivatedException, ) from pyicloud_ipd.services import ( FindMyiPhoneServiceManager, @@ -23,10 +23,12 @@ ContactsService, RemindersService, PhotosService, - AccountService + AccountService, + DriveService, ) from pyicloud_ipd.utils import get_password_from_keyring + LOGGER = logging.getLogger(__name__) HEADER_DATA = { @@ -39,13 +41,15 @@ class PyiCloudPasswordFilter(logging.Filter): + """Password log hider.""" + def __init__(self, password): - self.password = password + super().__init__(password) def filter(self, record): message = record.getMessage() - if self.password in message: - record.msg = message.replace(self.password, "*" * 8) + if self.name in message: + record.msg = message.replace(self.name, "*" * 8) record.args = [] return True diff --git a/src/pyicloud_ipd/cmdline.py b/src/pyicloud_ipd/cmdline.py index 3116fa924..5734be521 100644 --- a/src/pyicloud_ipd/cmdline.py +++ b/src/pyicloud_ipd/cmdline.py @@ -1,54 +1,46 @@ #! /usr/bin/env python -# -*- coding: utf-8 -*- """ A Command Line Wrapper to allow easy use of pyicloud for command line scripts, and related. """ -from __future__ import print_function import argparse import pickle import sys from click import confirm -import pyicloud_ipd +from pyicloud_ipd import PyiCloudService +from pyicloud_ipd.exceptions import PyiCloudFailedLoginException from . import utils - -DEVICE_ERROR = ( - "Please use the --device switch to indicate which device to use." -) +DEVICE_ERROR = "Please use the --device switch to indicate which device to use." def create_pickled_data(idevice, filename): - """This helper will output the idevice to a pickled file named + """ + This helper will output the idevice to a pickled file named after the passed filename. This allows the data to be used without resorting to screen / pipe - scrapping. """ - data = {} - for x in idevice.content: - data[x] = idevice.content[x] - location = filename - pickle_file = open(location, 'wb') - pickle.dump(data, pickle_file, protocol=pickle.HIGHEST_PROTOCOL) - pickle_file.close() + scrapping. + """ + with open(filename, "wb") as pickle_file: + pickle.dump(idevice.content, pickle_file, protocol=pickle.HIGHEST_PROTOCOL) def main(args=None): - """Main commandline entrypoint""" + """Main commandline entrypoint.""" if args is None: args = sys.argv[1:] - parser = argparse.ArgumentParser( - description="Find My iPhone CommandLine Tool") + parser = argparse.ArgumentParser(description="Find My iPhone CommandLine Tool") parser.add_argument( "--username", action="store", dest="username", default="", - help="Apple ID to Use" + help="Apple ID to Use", ) parser.add_argument( "--password", @@ -58,7 +50,7 @@ def main(args=None): help=( "Apple ID Password to Use; if unspecified, password will be " "fetched from the system keyring." - ) + ), ) parser.add_argument( "-n", @@ -66,7 +58,7 @@ def main(args=None): action="store_false", dest="interactive", default=True, - help="Disable interactive prompts." + help="Disable interactive prompts.", ) parser.add_argument( "--delete-from-keyring", @@ -97,7 +89,7 @@ def main(args=None): help="Retrieve Location for the iDevice (non-exclusive).", ) - # Restrict actions to a specific devices UID / DID + # Restrict actions to a specific devices UID / DID parser.add_argument( "--device", action="store", @@ -106,7 +98,7 @@ def main(args=None): help="Only effect this device", ) - # Trigger Sound Alert + # Trigger Sound Alert parser.add_argument( "--sound", action="store_true", @@ -115,7 +107,7 @@ def main(args=None): help="Play a sound on the device", ) - # Trigger Message w/Sound Alert + # Trigger Message w/Sound Alert parser.add_argument( "--message", action="store", @@ -124,7 +116,7 @@ def main(args=None): help="Optional Text Message to display with a sound", ) - # Trigger Message (without Sound) Alert + # Trigger Message (without Sound) Alert parser.add_argument( "--silentmessage", action="store", @@ -133,7 +125,7 @@ def main(args=None): help="Optional Text Message to display with no sounds", ) - # Lost Mode + # Lost Mode parser.add_argument( "--lostmode", action="store_true", @@ -163,7 +155,7 @@ def main(args=None): help="Forcibly display this message when activating lost mode.", ) - # Output device data to an pickle file + # Output device data to an pickle file parser.add_argument( "--outputfile", action="store_true", @@ -172,19 +164,10 @@ def main(args=None): help="Save device data to a file in the current directory.", ) - parser.add_argument( - "--domain", - action="store", - dest="domain", - default="com", - help="Root Domain for requests to iCloud. com or cn", - ) - command_line = parser.parse_args(args) username = command_line.username password = command_line.password - domain = command_line.domain if username and command_line.delete_from_keyring: utils.delete_password_in_keyring(username) @@ -194,56 +177,76 @@ def main(args=None): # Which password we use is determined by your username, so we # do need to check for this first and separately. if not username: - parser.error('No username supplied') + parser.error("No username supplied") if not password: password = utils.get_password( - username, - interactive=command_line.interactive + username, interactive=command_line.interactive ) if not password: - parser.error('No password supplied') + parser.error("No password supplied") try: - api = pyicloud_ipd.PyiCloudService( - domain, - username.strip(), - password.strip() - ) + api = PyiCloudService(username.strip(), password.strip()) if ( - not utils.password_exists_in_keyring(username) and - command_line.interactive and - confirm("Save password in keyring? ") + not utils.password_exists_in_keyring(username) + and command_line.interactive + and confirm("Save password in keyring?") ): utils.store_password_in_keyring(username, password) - if api.requires_2sa: - import click - print("Two-step authentication required.", - "Your trusted devices are:") + if api.requires_2fa: + # fmt: off + print( + "\nTwo-step authentication required.", + "\nPlease enter validation code" + ) + # fmt: on + + code = input("(string) --> ") + if not api.validate_2fa_code(code): + print("Failed to verify verification code") + sys.exit(1) + + print("") + + elif api.requires_2sa: + # fmt: off + print( + "\nTwo-step authentication required.", + "\nYour trusted devices are:" + ) + # fmt: on devices = api.trusted_devices for i, device in enumerate(devices): - print(" %s: %s" % ( - i, device.get( - 'deviceName', - "SMS to %s" % device.get('phoneNumber')))) + print( + " %s: %s" + % ( + i, + device.get( + "deviceName", "SMS to %s" % device.get("phoneNumber") + ), + ) + ) - device = click.prompt('Which device would you like to use?', - default=0) + print("\nWhich device would you like to use?") + device = int(input("(number) --> ")) device = devices[device] if not api.send_verification_code(device): print("Failed to send verification code") sys.exit(1) - code = click.prompt('Please enter validation code') + print("\nPlease enter validation code") + code = input("(string) --> ") if not api.validate_verification_code(device, code): print("Failed to verify verification code") sys.exit(1) + print("") break - except pyicloud_ipd.exceptions.PyiCloudFailedLoginException: + except PyiCloudFailedLoginException as err: # If they have a stored password; we just used it and # it did not work; let's delete it if there is one. if utils.password_exists_in_keyring(username): @@ -256,38 +259,32 @@ def main(args=None): failure_count += 1 if failure_count >= 3: - raise RuntimeError(message) + raise RuntimeError(message) from err print(message, file=sys.stderr) for dev in api.devices: - if ( - not command_line.device_id or - ( - command_line.device_id.strip().lower() == - dev.content["id"].strip().lower() - ) + if not command_line.device_id or ( + command_line.device_id.strip().lower() == dev.content["id"].strip().lower() ): - # List device(s) + # List device(s) if command_line.locate: dev.location() if command_line.output_to_file: create_pickled_data( dev, - filename=( - dev.content["name"].strip().lower() + ".fmip_snapshot" - ) + filename=(dev.content["name"].strip().lower() + ".fmip_snapshot"), ) contents = dev.content if command_line.longlist: - print("-"*30) + print("-" * 30) print(contents["name"]) - for x in contents: - print("%20s - %s" % (x, contents[x])) + for key in contents: + print("%20s - %s" % (key, contents[key])) elif command_line.list: - print("-"*30) + print("-" * 30) print("Name - %s" % contents["name"]) print("Display Name - %s" % contents["deviceDisplayName"]) print("Location - %s" % contents["location"]) @@ -296,68 +293,70 @@ def main(args=None): print("Device Class - %s" % contents["deviceClass"]) print("Device Model - %s" % contents["deviceModel"]) - # Play a Sound on a device + # Play a Sound on a device if command_line.sound: if command_line.device_id: dev.play_sound() else: raise RuntimeError( - "\n\n\t\t%s %s\n\n" % ( + "\n\n\t\t%s %s\n\n" + % ( "Sounds can only be played on a singular device.", - DEVICE_ERROR + DEVICE_ERROR, ) ) - # Display a Message on the device + # Display a Message on the device if command_line.message: if command_line.device_id: dev.display_message( - subject='A Message', - message=command_line.message, - sounds=True + subject="A Message", message=command_line.message, sounds=True ) else: raise RuntimeError( - "%s %s" % ( - "Messages can only be played " - "on a singular device.", - DEVICE_ERROR + "%s %s" + % ( + "Messages can only be played on a singular device.", + DEVICE_ERROR, ) ) - # Display a Silent Message on the device + # Display a Silent Message on the device if command_line.silentmessage: if command_line.device_id: dev.display_message( - subject='A Silent Message', + subject="A Silent Message", message=command_line.silentmessage, - sounds=False + sounds=False, ) else: raise RuntimeError( - "%s %s" % ( + "%s %s" + % ( "Silent Messages can only be played " "on a singular device.", - DEVICE_ERROR + DEVICE_ERROR, ) ) - # Enable Lost mode + # Enable Lost mode if command_line.lostmode: if command_line.device_id: dev.lost_device( number=command_line.lost_phone.strip(), text=command_line.lost_message.strip(), - newpasscode=command_line.lost_password.strip() + newpasscode=command_line.lost_password.strip(), ) else: raise RuntimeError( - "%s %s" % ( - "Lost Mode can only be activated " - "on a singular device.", - DEVICE_ERROR + "%s %s" + % ( + "Lost Mode can only be activated on a singular device.", + DEVICE_ERROR, ) ) + sys.exit(0) + -if __name__ == '__main__': +if __name__ == "__main__": main() diff --git a/src/pyicloud_ipd/exceptions.py b/src/pyicloud_ipd/exceptions.py index ef9c91729..80b7a6919 100644 --- a/src/pyicloud_ipd/exceptions.py +++ b/src/pyicloud_ipd/exceptions.py @@ -1,39 +1,50 @@ - -class PyiCloudException(Exception): - pass +"""Library exceptions.""" -class PyiCloudConnectionException(PyiCloudException): - pass - -class PyiCloudNoDevicesException(PyiCloudException): +class PyiCloudException(Exception): + """Generic iCloud exception.""" pass -class PyiCloudAPIResponseError(PyiCloudException): - def __init__(self, reason, code): +# API +class PyiCloudAPIResponseException(PyiCloudException): + """iCloud response exception.""" + def __init__(self, reason, code=None, retry=False): self.reason = reason self.code = code - message = reason + message = reason or "" if code: message += " (%s)" % code + if retry: + message += ". Retrying ..." + + super().__init__(message) - super(PyiCloudAPIResponseError, self).__init__(message) +class PyiCloudServiceNotActivatedException(PyiCloudAPIResponseException): + """iCloud service not activated exception.""" + pass + +# Login class PyiCloudFailedLoginException(PyiCloudException): + """iCloud failed login exception.""" pass -class PyiCloud2SARequiredError(PyiCloudException): - def __init__(self, url): - message = "Two-step authentication required for %s" % url - super(PyiCloud2SARequiredError, self).__init__(message) +class PyiCloud2SARequiredException(PyiCloudException): + """iCloud 2SA required exception.""" + def __init__(self, apple_id): + message = "Two-step authentication required for account: %s" % apple_id + super().__init__(message) -class NoStoredPasswordAvailable(PyiCloudException): +class PyiCloudNoStoredPasswordAvailableException(PyiCloudException): + """iCloud no stored password exception.""" pass -class PyiCloudServiceNotActivatedErrror(PyiCloudAPIResponseError): +# Webservice specific +class PyiCloudNoDevicesException(PyiCloudException): + """iCloud no device exception.""" pass diff --git a/src/pyicloud_ipd/services/__init__.py b/src/pyicloud_ipd/services/__init__.py index ca193291d..8629068d7 100644 --- a/src/pyicloud_ipd/services/__init__.py +++ b/src/pyicloud_ipd/services/__init__.py @@ -1,3 +1,4 @@ +"""Services.""" from pyicloud_ipd.services.calendar import CalendarService from pyicloud_ipd.services.findmyiphone import FindMyiPhoneServiceManager from pyicloud_ipd.services.ubiquity import UbiquityService @@ -5,3 +6,4 @@ from pyicloud_ipd.services.reminders import RemindersService from pyicloud_ipd.services.photos import PhotosService from pyicloud_ipd.services.account import AccountService +from pyicloud_ipd.services.drive import DriveService diff --git a/src/pyicloud_ipd/services/account.py b/src/pyicloud_ipd/services/account.py index f172bd71e..86b554fdc 100644 --- a/src/pyicloud_ipd/services/account.py +++ b/src/pyicloud_ipd/services/account.py @@ -1,55 +1,330 @@ -import sys - -import six +"""Account service.""" +from collections import OrderedDict from pyicloud_ipd.utils import underscore_to_camelcase -class AccountService(object): +class AccountService: + """The 'Account' iCloud service.""" + def __init__(self, service_root, session, params): self.session = session self.params = params self._service_root = service_root - self._devices = [] - - self._acc_endpoint = '%s/setup/web/device' % self._service_root - self._account_devices_url = '%s/getDevices' % self._acc_endpoint - req = self.session.get(self._account_devices_url, params=self.params) - self.response = req.json() + self._devices = [] + self._family = [] + self._storage = None - for device_info in self.response['devices']: - # device_id = device_info['udid'] - # self._devices[device_id] = AccountDevice(device_info) - self._devices.append(AccountDevice(device_info)) + self._acc_endpoint = "%s/setup/web" % self._service_root + self._acc_devices_url = "%s/device/getDevices" % self._acc_endpoint + self._acc_family_details_url = "%s/family/getFamilyDetails" % self._acc_endpoint + self._acc_family_member_photo_url = ( + "%s/family/getMemberPhoto" % self._acc_endpoint + ) + self._acc_storage_url = "https://setup.icloud.com/setup/ws/1/storageUsageInfo" @property def devices(self): + """Returns current paired devices.""" + if not self._devices: + req = self.session.get(self._acc_devices_url, params=self.params) + response = req.json() + + for device_info in response["devices"]: + self._devices.append(AccountDevice(device_info)) + return self._devices + @property + def family(self): + """Returns family members.""" + if not self._family: + req = self.session.get(self._acc_family_details_url, params=self.params) + response = req.json() + + for member_info in response["familyMembers"]: + self._family.append( + FamilyMember( + member_info, + self.session, + self.params, + self._acc_family_member_photo_url, + ) + ) + + return self._family + + @property + def storage(self): + """Returns storage infos.""" + if not self._storage: + req = self.session.get(self._acc_storage_url, params=self.params) + response = req.json() + + self._storage = AccountStorage(response) + + return self._storage + + def __str__(self): + return "{{devices: {}, family: {}, storage: {} bytes free}}".format( + len(self.devices), + len(self.family), + self.storage.usage.available_storage_in_bytes, + ) + + def __repr__(self): + return f"<{type(self).__name__}: {self}>" + -@six.python_2_unicode_compatible class AccountDevice(dict): - def __init__(self, device_info): - super(AccountDevice, self).__init__(device_info) + """Account device.""" - def __getattr__(self, name): - try: - return self[underscore_to_camelcase(name)] - except KeyError: - raise AttributeError(name) + def __getattr__(self, key): + return self[underscore_to_camelcase(key)] def __str__(self): - return u"{display_name}: {name}".format( - display_name=self.model_display_name, - name=self.name, + return f"{{model: {self.model_display_name}, name: {self.name}}}" + + def __repr__(self): + return f"<{type(self).__name__}: {self}>" + + +class FamilyMember: + """A family member.""" + + def __init__(self, member_info, session, params, acc_family_member_photo_url): + self._attrs = member_info + self._session = session + self._params = params + self._acc_family_member_photo_url = acc_family_member_photo_url + + @property + def last_name(self): + """Gets the last name.""" + return self._attrs.get("lastName") + + @property + def dsid(self): + """Gets the dsid.""" + return self._attrs.get("dsid") + + @property + def original_invitation_email(self): + """Gets the original invitation.""" + return self._attrs.get("originalInvitationEmail") + + @property + def full_name(self): + """Gets the full name.""" + return self._attrs.get("fullName") + + @property + def age_classification(self): + """Gets the age classification.""" + return self._attrs.get("ageClassification") + + @property + def apple_id_for_purchases(self): + """Gets the apple id for purchases.""" + return self._attrs.get("appleIdForPurchases") + + @property + def apple_id(self): + """Gets the apple id.""" + return self._attrs.get("appleId") + + @property + def family_id(self): + """Gets the family id.""" + return self._attrs.get("familyId") + + @property + def first_name(self): + """Gets the first name.""" + return self._attrs.get("firstName") + + @property + def has_parental_privileges(self): + """Has parental privileges.""" + return self._attrs.get("hasParentalPrivileges") + + @property + def has_screen_time_enabled(self): + """Has screen time enabled.""" + return self._attrs.get("hasScreenTimeEnabled") + + @property + def has_ask_to_buy_enabled(self): + """Has to ask for buying.""" + return self._attrs.get("hasAskToBuyEnabled") + + @property + def has_share_purchases_enabled(self): + """Has share purshases.""" + return self._attrs.get("hasSharePurchasesEnabled") + + @property + def share_my_location_enabled_family_members(self): + """Has share my location with family.""" + return self._attrs.get("shareMyLocationEnabledFamilyMembers") + + @property + def has_share_my_location_enabled(self): + """Has share my location.""" + return self._attrs.get("hasShareMyLocationEnabled") + + @property + def dsid_for_purchases(self): + """Gets the dsid for purchases.""" + return self._attrs.get("dsidForPurchases") + + def get_photo(self): + """Returns the photo.""" + params_photo = dict(self._params) + params_photo.update({"memberId": self.dsid}) + return self._session.get( + self._acc_family_member_photo_url, params=params_photo, stream=True + ) + + def __getitem__(self, key): + if self._attrs.get(key): + return self._attrs[key] + return getattr(self, key) + + def __str__(self): + return "{{name: {}, age_classification: {}}}".format( + self.full_name, + self.age_classification, ) def __repr__(self): - return '<{display}>'.format( - display=( - six.text_type(self) - if sys.version_info[0] >= 3 else - six.text_type(self).encode('utf8', 'replace') - ) + return f"<{type(self).__name__}: {self}>" + + +class AccountStorageUsageForMedia: + """Storage used for a specific media type into the account.""" + + def __init__(self, usage_data): + self.usage_data = usage_data + + @property + def key(self): + """Gets the key.""" + return self.usage_data["mediaKey"] + + @property + def label(self): + """Gets the label.""" + return self.usage_data["displayLabel"] + + @property + def color(self): + """Gets the HEX color.""" + return self.usage_data["displayColor"] + + @property + def usage_in_bytes(self): + """Gets the usage in bytes.""" + return self.usage_data["usageInBytes"] + + def __str__(self): + return f"{{key: {self.key}, usage: {self.usage_in_bytes} bytes}}" + + def __repr__(self): + return f"<{type(self).__name__}: {self}>" + + +class AccountStorageUsage: + """Storage used for a specific media type into the account.""" + + def __init__(self, usage_data, quota_data): + self.usage_data = usage_data + self.quota_data = quota_data + + @property + def comp_storage_in_bytes(self): + """Gets the comp storage in bytes.""" + return self.usage_data["compStorageInBytes"] + + @property + def used_storage_in_bytes(self): + """Gets the used storage in bytes.""" + return self.usage_data["usedStorageInBytes"] + + @property + def used_storage_in_percent(self): + """Gets the used storage in percent.""" + return round(self.used_storage_in_bytes * 100 / self.total_storage_in_bytes, 2) + + @property + def available_storage_in_bytes(self): + """Gets the available storage in bytes.""" + return self.total_storage_in_bytes - self.used_storage_in_bytes + + @property + def available_storage_in_percent(self): + """Gets the available storage in percent.""" + return round( + self.available_storage_in_bytes * 100 / self.total_storage_in_bytes, 2 ) + + @property + def total_storage_in_bytes(self): + """Gets the total storage in bytes.""" + return self.usage_data["totalStorageInBytes"] + + @property + def commerce_storage_in_bytes(self): + """Gets the commerce storage in bytes.""" + return self.usage_data["commerceStorageInBytes"] + + @property + def quota_over(self): + """Gets the over quota.""" + return self.quota_data["overQuota"] + + @property + def quota_tier_max(self): + """Gets the max tier quota.""" + return self.quota_data["haveMaxQuotaTier"] + + @property + def quota_almost_full(self): + """Gets the almost full quota.""" + return self.quota_data["almost-full"] + + @property + def quota_paid(self): + """Gets the paid quota.""" + return self.quota_data["paidQuota"] + + def __str__(self): + return "{}% used of {} bytes".format( + self.used_storage_in_percent, + self.total_storage_in_bytes, + ) + + def __repr__(self): + return f"<{type(self).__name__}: {self}>" + + +class AccountStorage: + """Storage of the account.""" + + def __init__(self, storage_data): + self.usage = AccountStorageUsage( + storage_data.get("storageUsageInfo"), storage_data.get("quotaStatus") + ) + self.usages_by_media = OrderedDict() + + for usage_media in storage_data.get("storageUsageByMedia"): + self.usages_by_media[usage_media["mediaKey"]] = AccountStorageUsageForMedia( + usage_media + ) + + def __str__(self): + return f"{{usage: {self.usage}, usages_by_media: {self.usages_by_media}}}" + + def __repr__(self): + return f"<{type(self).__name__}: {self}>" diff --git a/src/pyicloud_ipd/services/calendar.py b/src/pyicloud_ipd/services/calendar.py index 49cf11889..1c7687fc7 100644 --- a/src/pyicloud_ipd/services/calendar.py +++ b/src/pyicloud_ipd/services/calendar.py @@ -1,24 +1,25 @@ -from __future__ import absolute_import -from datetime import datetime, timedelta +"""Calendar service.""" +from datetime import datetime from calendar import monthrange -import time -from tzlocal import get_localzone +from tzlocal import get_localzone_name -class CalendarService(object): +class CalendarService: """ The 'Calendar' iCloud service, connects to iCloud and returns events. """ + def __init__(self, service_root, session, params): self.session = session self.params = params self._service_root = service_root - self._calendar_endpoint = '%s/ca' % self._service_root - self._calendar_refresh_url = '%s/events' % self._calendar_endpoint - self._calendar_event_detail_url = '%s/eventdetail' % ( - self._calendar_endpoint, - ) + self._calendar_endpoint = "%s/ca" % self._service_root + self._calendar_refresh_url = "%s/events" % self._calendar_endpoint + self._calendar_event_detail_url = f"{self._calendar_endpoint}/eventdetail" + self._calendars = "%s/startup" % self._calendar_endpoint + + self.response = {} def get_event_detail(self, pguid, guid): """ @@ -26,11 +27,11 @@ def get_event_detail(self, pguid, guid): (a calendar) and a guid (an event's ID). """ params = dict(self.params) - params.update({'lang': 'en-us', 'usertz': get_localzone().zone}) - url = '%s/%s/%s' % (self._calendar_event_detail_url, pguid, guid) + params.update({"lang": "en-us", "usertz": get_localzone_name()}) + url = f"{self._calendar_event_detail_url}/{pguid}/{guid}" req = self.session.get(url, params=params) self.response = req.json() - return self.response['Event'][0] + return self.response["Event"][0] def refresh_client(self, from_dt=None, to_dt=None): """ @@ -45,12 +46,14 @@ def refresh_client(self, from_dt=None, to_dt=None): if not to_dt: to_dt = datetime(today.year, today.month, last_day) params = dict(self.params) - params.update({ - 'lang': 'en-us', - 'usertz': get_localzone().zone, - 'startDate': from_dt.strftime('%Y-%m-%d'), - 'endDate': to_dt.strftime('%Y-%m-%d') - }) + params.update( + { + "lang": "en-us", + "usertz": get_localzone_name(), + "startDate": from_dt.strftime("%Y-%m-%d"), + "endDate": to_dt.strftime("%Y-%m-%d"), + } + ) req = self.session.get(self._calendar_refresh_url, params=params) self.response = req.json() @@ -59,4 +62,25 @@ def events(self, from_dt=None, to_dt=None): Retrieves events for a given date range, by default, this month. """ self.refresh_client(from_dt, to_dt) - return self.response['Event'] + return self.response.get("Event") + + def calendars(self): + """ + Retrieves calendars of this month. + """ + today = datetime.today() + first_day, last_day = monthrange(today.year, today.month) + from_dt = datetime(today.year, today.month, first_day) + to_dt = datetime(today.year, today.month, last_day) + params = dict(self.params) + params.update( + { + "lang": "en-us", + "usertz": get_localzone_name(), + "startDate": from_dt.strftime("%Y-%m-%d"), + "endDate": to_dt.strftime("%Y-%m-%d"), + } + ) + req = self.session.get(self._calendars, params=params) + self.response = req.json() + return self.response["Collection"] diff --git a/src/pyicloud_ipd/services/contacts.py b/src/pyicloud_ipd/services/contacts.py index a5a79e08a..b61742f9d 100644 --- a/src/pyicloud_ipd/services/contacts.py +++ b/src/pyicloud_ipd/services/contacts.py @@ -1,48 +1,48 @@ -from __future__ import absolute_import -import os -import uuid -from datetime import datetime -from calendar import monthrange +"""Contacts service.""" -class ContactsService(object): +class ContactsService: """ The 'Contacts' iCloud service, connects to iCloud and returns contacts. """ + def __init__(self, service_root, session, params): self.session = session self.params = params self._service_root = service_root - self._contacts_endpoint = '%s/co' % self._service_root - self._contacts_refresh_url = '%s/startup' % self._contacts_endpoint - self._contacts_changeset_url = '%s/changeset' % self._contacts_endpoint + self._contacts_endpoint = "%s/co" % self._service_root + self._contacts_refresh_url = "%s/startup" % self._contacts_endpoint + self._contacts_next_url = "%s/contacts" % self._contacts_endpoint + self._contacts_changeset_url = "%s/changeset" % self._contacts_endpoint + + self.response = {} - def refresh_client(self, from_dt=None, to_dt=None): + def refresh_client(self): """ Refreshes the ContactsService endpoint, ensuring that the contacts data is up-to-date. """ params_contacts = dict(self.params) - params_contacts.update({ - 'clientVersion': '2.1', - 'locale': 'en_US', - 'order': 'last,first', - }) - req = self.session.get( - self._contacts_refresh_url, - params=params_contacts + params_contacts.update( + { + "clientVersion": "2.1", + "locale": "en_US", + "order": "last,first", + } ) + req = self.session.get(self._contacts_refresh_url, params=params_contacts) self.response = req.json() - params_refresh = dict(self.params) - params_refresh.update({ - 'prefToken': req.json()["prefToken"], - 'syncToken': req.json()["syncToken"], - }) - self.session.post(self._contacts_changeset_url, params=params_refresh) - req = self.session.get( - self._contacts_refresh_url, - params=params_contacts + + params_next = dict(params_contacts) + params_next.update( + { + "prefToken": self.response["prefToken"], + "syncToken": self.response["syncToken"], + "limit": "0", + "offset": "0", + } ) + req = self.session.get(self._contacts_next_url, params=params_next) self.response = req.json() def all(self): @@ -50,4 +50,4 @@ def all(self): Retrieves all contacts. """ self.refresh_client() - return self.response['contacts'] + return self.response.get("contacts") diff --git a/src/pyicloud_ipd/services/drive.py b/src/pyicloud_ipd/services/drive.py new file mode 100644 index 000000000..6e51c6ae7 --- /dev/null +++ b/src/pyicloud_ipd/services/drive.py @@ -0,0 +1,362 @@ +"""Drive service.""" +from datetime import datetime, timedelta +import json +import logging +import io +import mimetypes +import os +import time +from re import search +from requests import Response + +from pyicloud_ipd.exceptions import PyiCloudAPIResponseException + + +LOGGER = logging.getLogger(__name__) + + +class DriveService: + """The 'Drive' iCloud service.""" + + def __init__(self, service_root, document_root, session, params): + self._service_root = service_root + self._document_root = document_root + self.session = session + self.params = dict(params) + self._root = None + + def _get_token_from_cookie(self): + for cookie in self.session.cookies: + if cookie.name == "X-APPLE-WEBAUTH-VALIDATE": + match = search(r"\bt=([^:]+)", cookie.value) + if match is None: + raise Exception("Can't extract token from %r" % cookie.value) + return {"token": match.group(1)} + raise Exception("Token cookie not found") + + def get_node_data(self, node_id): + """Returns the node data.""" + request = self.session.post( + self._service_root + "/retrieveItemDetailsInFolders", + params=self.params, + data=json.dumps( + [ + { + "drivewsid": "FOLDER::com.apple.CloudDocs::%s" % node_id, + "partialData": False, + } + ] + ), + ) + self._raise_if_error(request) + return request.json()[0] + + def get_file(self, file_id, **kwargs): + """Returns iCloud Drive file.""" + file_params = dict(self.params) + file_params.update({"document_id": file_id}) + response = self.session.get( + self._document_root + "/ws/com.apple.CloudDocs/download/by_id", + params=file_params, + ) + self._raise_if_error(response) + response_json = response.json() + package_token = response_json.get("package_token") + data_token = response_json.get("data_token") + if data_token and data_token.get("url"): + return self.session.get(data_token["url"], params=self.params, **kwargs) + if package_token and package_token.get("url"): + return self.session.get(package_token["url"], params=self.params, **kwargs) + raise KeyError("'data_token' nor 'package_token'") + + def get_app_data(self): + """Returns the app library (previously ubiquity).""" + request = self.session.get( + self._service_root + "/retrieveAppLibraries", params=self.params + ) + self._raise_if_error(request) + return request.json()["items"] + + def _get_upload_contentws_url(self, file_object): + """Get the contentWS endpoint URL to add a new file.""" + content_type = mimetypes.guess_type(file_object.name)[0] + if content_type is None: + content_type = "" + + # Get filesize from file object + orig_pos = file_object.tell() + file_object.seek(0, os.SEEK_END) + file_size = file_object.tell() + file_object.seek(orig_pos, os.SEEK_SET) + + file_params = self.params + file_params.update(self._get_token_from_cookie()) + + request = self.session.post( + self._document_root + "/ws/com.apple.CloudDocs/upload/web", + params=file_params, + headers={"Content-Type": "text/plain"}, + data=json.dumps( + { + "filename": file_object.name, + "type": "FILE", + "content_type": content_type, + "size": file_size, + } + ), + ) + self._raise_if_error(request) + return (request.json()[0]["document_id"], request.json()[0]["url"]) + + def _update_contentws(self, folder_id, sf_info, document_id, file_object): + data = { + "data": { + "signature": sf_info["fileChecksum"], + "wrapping_key": sf_info["wrappingKey"], + "reference_signature": sf_info["referenceChecksum"], + "size": sf_info["size"], + }, + "command": "add_file", + "create_short_guid": True, + "document_id": document_id, + "path": { + "starting_document_id": folder_id, + "path": file_object.name, + }, + "allow_conflict": True, + "file_flags": { + "is_writable": True, + "is_executable": False, + "is_hidden": False, + }, + "mtime": int(time.time() * 1000), + "btime": int(time.time() * 1000), + } + + # Add the receipt if we have one. Will be absent for 0-sized files + if sf_info.get("receipt"): + data["data"].update({"receipt": sf_info["receipt"]}) + + request = self.session.post( + self._document_root + "/ws/com.apple.CloudDocs/update/documents", + params=self.params, + headers={"Content-Type": "text/plain"}, + data=json.dumps(data), + ) + self._raise_if_error(request) + return request.json() + + def send_file(self, folder_id, file_object): + """Send new file to iCloud Drive.""" + document_id, content_url = self._get_upload_contentws_url(file_object) + + request = self.session.post(content_url, files={file_object.name: file_object}) + self._raise_if_error(request) + content_response = request.json()["singleFile"] + self._update_contentws(folder_id, content_response, document_id, file_object) + + def create_folders(self, parent, name): + """Creates a new iCloud Drive folder""" + request = self.session.post( + self._service_root + "/createFolders", + params=self.params, + headers={"Content-Type": "text/plain"}, + data=json.dumps( + { + "destinationDrivewsId": parent, + "folders": [ + { + "clientId": self.params["clientId"], + "name": name, + } + ], + } + ), + ) + self._raise_if_error(request) + return request.json() + + def rename_items(self, node_id, etag, name): + """Renames an iCloud Drive node""" + request = self.session.post( + self._service_root + "/renameItems", + params=self.params, + data=json.dumps( + { + "items": [ + { + "drivewsid": node_id, + "etag": etag, + "name": name, + } + ], + } + ), + ) + self._raise_if_error(request) + return request.json() + + def move_items_to_trash(self, node_id, etag): + """Moves an iCloud Drive node to the trash bin""" + request = self.session.post( + self._service_root + "/moveItemsToTrash", + params=self.params, + data=json.dumps( + { + "items": [ + { + "drivewsid": node_id, + "etag": etag, + "clientId": self.params["clientId"], + } + ], + } + ), + ) + self._raise_if_error(request) + return request.json() + + @property + def root(self): + """Returns the root node.""" + if not self._root: + self._root = DriveNode(self, self.get_node_data("root")) + return self._root + + def __getattr__(self, attr): + return getattr(self.root, attr) + + def __getitem__(self, key): + return self.root[key] + + def _raise_if_error(self, response): # pylint: disable=no-self-use + if not response.ok: + api_error = PyiCloudAPIResponseException( + response.reason, response.status_code + ) + LOGGER.error(api_error) + raise api_error + + +class DriveNode: + """Drive node.""" + + def __init__(self, conn, data): + self.data = data + self.connection = conn + self._children = None + + @property + def name(self): + """Gets the node name.""" + if "extension" in self.data: + return "{}.{}".format(self.data["name"], self.data["extension"]) + return self.data["name"] + + @property + def type(self): + """Gets the node type.""" + node_type = self.data.get("type") + return node_type and node_type.lower() + + def get_children(self): + """Gets the node children.""" + if not self._children: + if "items" not in self.data: + self.data.update(self.connection.get_node_data(self.data["docwsid"])) + if "items" not in self.data: + raise KeyError("No items in folder, status: %s" % self.data["status"]) + self._children = [ + DriveNode(self.connection, item_data) + for item_data in self.data["items"] + ] + return self._children + + @property + def size(self): + """Gets the node size.""" + size = self.data.get("size") # Folder does not have size + if not size: + return None + return int(size) + + @property + def date_changed(self): + """Gets the node changed date (in UTC).""" + return _date_to_utc(self.data.get("dateChanged")) # Folder does not have date + + @property + def date_modified(self): + """Gets the node modified date (in UTC).""" + return _date_to_utc(self.data.get("dateModified")) # Folder does not have date + + @property + def date_last_open(self): + """Gets the node last open date (in UTC).""" + return _date_to_utc(self.data.get("lastOpenTime")) # Folder does not have date + + def open(self, **kwargs): + """Gets the node file.""" + # iCloud returns 400 Bad Request for 0-byte files + if self.data["size"] == 0: + response = Response() + response.raw = io.BytesIO() + return response + return self.connection.get_file(self.data["docwsid"], **kwargs) + + def upload(self, file_object, **kwargs): + """Upload a new file.""" + return self.connection.send_file(self.data["docwsid"], file_object, **kwargs) + + def dir(self): + """Gets the node list of directories.""" + if self.type == "file": + return None + return [child.name for child in self.get_children()] + + def mkdir(self, folder): + """Create a new directory directory.""" + return self.connection.create_folders(self.data["drivewsid"], folder) + + def rename(self, name): + """Rename an iCloud Drive item.""" + return self.connection.rename_items( + self.data["drivewsid"], self.data["etag"], name + ) + + def delete(self): + """Delete an iCloud Drive item.""" + return self.connection.move_items_to_trash( + self.data["drivewsid"], self.data["etag"] + ) + + def get(self, name): + """Gets the node child.""" + if self.type == "file": + return None + return [child for child in self.get_children() if child.name == name][0] + + def __getitem__(self, key): + try: + return self.get(key) + except IndexError as i: + raise KeyError(f"No child named '{key}' exists") from i + + def __str__(self): + return rf"\{type: {self.type}, name: {self.name}\}" + + def __repr__(self): + return f"<{type(self).__name__}: {str(self)}>" + + +def _date_to_utc(date): + if not date: + return None + # jump through hoops to return time in UTC rather than California time + match = search(r"^(.+?)([\+\-]\d+):(\d\d)$", date) + if not match: + # Already in UTC + return datetime.strptime(date, "%Y-%m-%dT%H:%M:%SZ") + base = datetime.strptime(match.group(1), "%Y-%m-%dT%H:%M:%S") + diff = timedelta(hours=int(match.group(2)), minutes=int(match.group(3))) + return base - diff diff --git a/src/pyicloud_ipd/services/findmyiphone.py b/src/pyicloud_ipd/services/findmyiphone.py index a2a468f9d..0d94f4721 100644 --- a/src/pyicloud_ipd/services/findmyiphone.py +++ b/src/pyicloud_ipd/services/findmyiphone.py @@ -1,34 +1,32 @@ +"""Find my iPhone service.""" import json -import sys - -import six from pyicloud_ipd.exceptions import PyiCloudNoDevicesException -class FindMyiPhoneServiceManager(object): - """ The 'Find my iPhone' iCloud service +class FindMyiPhoneServiceManager: + """The 'Find my iPhone' iCloud service This connects to iCloud and return phone data including the near-realtime latitude and longitude. - """ - def __init__(self, service_root, session, params): + def __init__(self, service_root, session, params, with_family=False): self.session = session self.params = params - self._service_root = service_root - self._fmip_endpoint = '%s/fmipservice/client/web' % self._service_root - self._fmip_refresh_url = '%s/refreshClient' % self._fmip_endpoint - self._fmip_sound_url = '%s/playSound' % self._fmip_endpoint - self._fmip_message_url = '%s/sendMessage' % self._fmip_endpoint - self._fmip_lost_url = '%s/lostDevice' % self._fmip_endpoint + self.with_family = with_family + + fmip_endpoint = "%s/fmipservice/client/web" % service_root + self._fmip_refresh_url = "%s/refreshClient" % fmip_endpoint + self._fmip_sound_url = "%s/playSound" % fmip_endpoint + self._fmip_message_url = "%s/sendMessage" % fmip_endpoint + self._fmip_lost_url = "%s/lostDevice" % fmip_endpoint self._devices = {} self.refresh_client() def refresh_client(self): - """ Refreshes the FindMyiPhoneService endpoint, + """Refreshes the FindMyiPhoneService endpoint, This ensures that the location data is up-to-date. @@ -38,18 +36,19 @@ def refresh_client(self): params=self.params, data=json.dumps( { - 'clientContext': { - 'fmly': True, - 'shouldLocate': True, - 'selectedDevice': 'all', + "clientContext": { + "fmly": self.with_family, + "shouldLocate": True, + "selectedDevice": "all", + "deviceListVersion": 1, } } - ) + ), ) self.response = req.json() - for device_info in self.response['content']: - device_id = device_info['id'] + for device_info in self.response["content"]: + device_id = device_info["id"] if device_id not in self._devices: self._devices[device_id] = AppleDevice( device_info, @@ -68,33 +67,31 @@ def refresh_client(self): def __getitem__(self, key): if isinstance(key, int): - if six.PY3: - key = list(self.keys())[key] - else: - key = self.keys()[key] + key = list(self.keys())[key] return self._devices[key] def __getattr__(self, attr): return getattr(self._devices, attr) - def __unicode__(self): - return six.text_type(self._devices) - def __str__(self): - as_unicode = self.__unicode__() - if sys.version_info[0] >= 3: - return as_unicode - else: - return as_unicode.encode('ascii', 'ignore') + return f"{self._devices}" def __repr__(self): - return six.text_type(self) + return f"{self}" -class AppleDevice(object): +class AppleDevice: + """Apple device.""" + def __init__( - self, content, session, params, manager, - sound_url=None, lost_url=None, message_url=None + self, + content, + session, + params, + manager, + sound_url=None, + lost_url=None, + message_url=None, ): self.content = content self.manager = manager @@ -106,94 +103,84 @@ def __init__( self.message_url = message_url def update(self, data): + """Updates the device data.""" self.content = data def location(self): + """Updates the device location.""" self.manager.refresh_client() - return self.content['location'] + return self.content["location"] - def status(self, additional=[]): - """ Returns status information for device. + def status(self, additional=[]): # pylint: disable=dangerous-default-value + """Returns status information for device. This returns only a subset of possible properties. """ self.manager.refresh_client() - fields = ['batteryLevel', 'deviceDisplayName', 'deviceStatus', 'name'] + fields = ["batteryLevel", "deviceDisplayName", "deviceStatus", "name"] fields += additional properties = {} for field in fields: properties[field] = self.content.get(field) return properties - def play_sound(self, subject='Find My iPhone Alert'): - """ Send a request to the device to play a sound. + def play_sound(self, subject="Find My iPhone Alert"): + """Send a request to the device to play a sound. It's possible to pass a custom message by changing the `subject`. """ - data = json.dumps({ - 'device': self.content['id'], - 'subject': subject, - 'clientContext': { - 'fmly': True + data = json.dumps( + { + "device": self.content["id"], + "subject": subject, + "clientContext": {"fmly": True}, } - }) - self.session.post( - self.sound_url, - params=self.params, - data=data ) + self.session.post(self.sound_url, params=self.params, data=data) def display_message( - self, subject='Find My iPhone Alert', message="This is a note", - sounds=False + self, subject="Find My iPhone Alert", message="This is a note", sounds=False ): - """ Send a request to the device to play a sound. + """Send a request to the device to play a sound. It's possible to pass a custom message by changing the `subject`. """ data = json.dumps( { - 'device': self.content['id'], - 'subject': subject, - 'sound': sounds, - 'userText': True, - 'text': message + "device": self.content["id"], + "subject": subject, + "sound": sounds, + "userText": True, + "text": message, } ) - self.session.post( - self.message_url, - params=self.params, - data=data - ) + self.session.post(self.message_url, params=self.params, data=data) def lost_device( - self, number, - text='This iPhone has been lost. Please call me.', - newpasscode="" + self, number, text="This iPhone has been lost. Please call me.", newpasscode="" ): - """ Send a request to the device to trigger 'lost mode'. + """Send a request to the device to trigger 'lost mode'. The device will show the message in `text`, and if a number has been passed, then the person holding the device can call the number without entering the passcode. """ - data = json.dumps({ - 'text': text, - 'userText': True, - 'ownerNbr': number, - 'lostModeEnabled': True, - 'trackingEnabled': True, - 'device': self.content['id'], - 'passcode': newpasscode - }) - self.session.post( - self.lost_url, - params=self.params, - data=data + data = json.dumps( + { + "text": text, + "userText": True, + "ownerNbr": number, + "lostModeEnabled": True, + "trackingEnabled": True, + "device": self.content["id"], + "passcode": newpasscode, + } ) + self.session.post(self.lost_url, params=self.params, data=data) @property def data(self): + """Gets the device data.""" return self.content def __getitem__(self, key): @@ -202,20 +189,8 @@ def __getitem__(self, key): def __getattr__(self, attr): return getattr(self.content, attr) - def __unicode__(self): - display_name = self['deviceDisplayName'] - name = self['name'] - return '%s: %s' % ( - display_name, - name, - ) - def __str__(self): - as_unicode = self.__unicode__() - if sys.version_info[0] >= 3: - return as_unicode - else: - return as_unicode.encode('ascii', 'ignore') + return f"{self['deviceDisplayName']}: {self['name']}" def __repr__(self): - return '' % str(self) + return f"" diff --git a/src/pyicloud_ipd/services/photos.py b/src/pyicloud_ipd/services/photos.py index 2c53c4b1d..0e3882322 100644 --- a/src/pyicloud_ipd/services/photos.py +++ b/src/pyicloud_ipd/services/photos.py @@ -1,295 +1,245 @@ -import sys +"""Photo service.""" import json -import logging import base64 -import re - -from datetime import datetime -from pyicloud_ipd.exceptions import PyiCloudServiceNotActivatedErrror -from pyicloud_ipd.exceptions import PyiCloudAPIResponseError - -import pytz - from urllib.parse import urlencode -logger = logging.getLogger(__name__) +from datetime import datetime, timezone +from pyicloud_ipd.exceptions import PyiCloudServiceNotActivatedException -class PhotoLibrary(object): - """Represents a library in the user's photos. +class PhotosService: + """The 'Photos' iCloud service.""" - This provides access to all the albums as well as the photos. - """ SMART_FOLDERS = { "All Photos": { - "obj_type": "CPLAssetByAssetDateWithoutHiddenOrDeleted", - "list_type": "CPLAssetAndMasterByAssetDateWithoutHiddenOrDeleted", + "obj_type": "CPLAssetByAddedDate", + "list_type": "CPLAssetAndMasterByAddedDate", "direction": "ASCENDING", - "query_filter": None + "query_filter": None, }, "Time-lapse": { "obj_type": "CPLAssetInSmartAlbumByAssetDate:Timelapse", "list_type": "CPLAssetAndMasterInSmartAlbumByAssetDate", "direction": "ASCENDING", - "query_filter": [{ - "fieldName": "smartAlbum", - "comparator": "EQUALS", - "fieldValue": { - "type": "STRING", - "value": "TIMELAPSE" + "query_filter": [ + { + "fieldName": "smartAlbum", + "comparator": "EQUALS", + "fieldValue": {"type": "STRING", "value": "TIMELAPSE"}, } - }] + ], }, "Videos": { "obj_type": "CPLAssetInSmartAlbumByAssetDate:Video", "list_type": "CPLAssetAndMasterInSmartAlbumByAssetDate", "direction": "ASCENDING", - "query_filter": [{ - "fieldName": "smartAlbum", - "comparator": "EQUALS", - "fieldValue": { - "type": "STRING", - "value": "VIDEO" + "query_filter": [ + { + "fieldName": "smartAlbum", + "comparator": "EQUALS", + "fieldValue": {"type": "STRING", "value": "VIDEO"}, } - }] + ], }, "Slo-mo": { "obj_type": "CPLAssetInSmartAlbumByAssetDate:Slomo", "list_type": "CPLAssetAndMasterInSmartAlbumByAssetDate", "direction": "ASCENDING", - "query_filter": [{ - "fieldName": "smartAlbum", - "comparator": "EQUALS", - "fieldValue": { - "type": "STRING", - "value": "SLOMO" + "query_filter": [ + { + "fieldName": "smartAlbum", + "comparator": "EQUALS", + "fieldValue": {"type": "STRING", "value": "SLOMO"}, } - }] + ], }, "Bursts": { "obj_type": "CPLAssetBurstStackAssetByAssetDate", "list_type": "CPLBurstStackAssetAndMasterByAssetDate", "direction": "ASCENDING", - "query_filter": None + "query_filter": None, }, "Favorites": { "obj_type": "CPLAssetInSmartAlbumByAssetDate:Favorite", "list_type": "CPLAssetAndMasterInSmartAlbumByAssetDate", "direction": "ASCENDING", - "query_filter": [{ - "fieldName": "smartAlbum", - "comparator": "EQUALS", - "fieldValue": { - "type": "STRING", - "value": "FAVORITE" + "query_filter": [ + { + "fieldName": "smartAlbum", + "comparator": "EQUALS", + "fieldValue": {"type": "STRING", "value": "FAVORITE"}, } - }] + ], }, "Panoramas": { "obj_type": "CPLAssetInSmartAlbumByAssetDate:Panorama", "list_type": "CPLAssetAndMasterInSmartAlbumByAssetDate", "direction": "ASCENDING", - "query_filter": [{ - "fieldName": "smartAlbum", - "comparator": "EQUALS", - "fieldValue": { - "type": "STRING", - "value": "PANORAMA" + "query_filter": [ + { + "fieldName": "smartAlbum", + "comparator": "EQUALS", + "fieldValue": {"type": "STRING", "value": "PANORAMA"}, } - }] + ], }, "Screenshots": { "obj_type": "CPLAssetInSmartAlbumByAssetDate:Screenshot", "list_type": "CPLAssetAndMasterInSmartAlbumByAssetDate", "direction": "ASCENDING", - "query_filter": [{ - "fieldName": "smartAlbum", - "comparator": "EQUALS", - "fieldValue": { - "type": "STRING", - "value": "SCREENSHOT" + "query_filter": [ + { + "fieldName": "smartAlbum", + "comparator": "EQUALS", + "fieldValue": {"type": "STRING", "value": "SCREENSHOT"}, } - }] + ], }, "Live": { "obj_type": "CPLAssetInSmartAlbumByAssetDate:Live", "list_type": "CPLAssetAndMasterInSmartAlbumByAssetDate", "direction": "ASCENDING", - "query_filter": [{ - "fieldName": "smartAlbum", - "comparator": "EQUALS", - "fieldValue": { - "type": "STRING", - "value": "LIVE" + "query_filter": [ + { + "fieldName": "smartAlbum", + "comparator": "EQUALS", + "fieldValue": {"type": "STRING", "value": "LIVE"}, } - }] + ], }, "Recently Deleted": { "obj_type": "CPLAssetDeletedByExpungedDate", "list_type": "CPLAssetAndMasterDeletedByExpungedDate", "direction": "ASCENDING", - "query_filter": None + "query_filter": None, }, "Hidden": { "obj_type": "CPLAssetHiddenByAssetDate", "list_type": "CPLAssetAndMasterHiddenByAssetDate", "direction": "ASCENDING", - "query_filter": None + "query_filter": None, }, } - def __init__(self, service, zone_id): - self.service = service - self.zone_id = zone_id + def __init__(self, service_root, session, params): + self.session = session + self.params = dict(params) + self._service_root = service_root + self.service_endpoint = ( + "%s/database/1/com.apple.photos.cloud/production/private" + % self._service_root + ) self._albums = None - url = ('%s/records/query?%s' % - (self.service._service_endpoint, urlencode(self.service.params))) - json_data = json.dumps({ - "query": {"recordType":"CheckIndexingState"}, - "zoneID": self.zone_id, - }) - - request = self.service.session.post( - url, - data=json_data, - headers={'Content-type': 'text/plain'} + self.params.update({"remapEnums": True, "getCurrentSyncToken": True}) + + url = f"{self.service_endpoint}/records/query?{urlencode(self.params)}" + json_data = ( + '{"query":{"recordType":"CheckIndexingState"},' + '"zoneID":{"zoneName":"PrimarySync"}}' + ) + request = self.session.post( + url, data=json_data, headers={"Content-type": "text/plain"} ) response = request.json() - indexing_state = response['records'][0]['fields']['state']['value'] - if indexing_state != 'FINISHED': - raise PyiCloudServiceNotActivatedErrror( - ('iCloud Photo Library not finished indexing. Please try ' - 'again in a few minutes'), None) + indexing_state = response["records"][0]["fields"]["state"]["value"] + if indexing_state != "FINISHED": + raise PyiCloudServiceNotActivatedException( + "iCloud Photo Library not finished indexing. " + "Please try again in a few minutes." + ) + + # TODO: Does syncToken ever change? # pylint: disable=fixme + # self.params.update({ + # 'syncToken': response['syncToken'], + # 'clientInstanceId': self.params.pop('clientId') + # }) + + self._photo_assets = {} @property def albums(self): + """Returns photo albums.""" if not self._albums: self._albums = { - name: PhotoAlbum(self.service, name, zone_id=self.zone_id, **props) + name: PhotoAlbum(self, name, **props) for (name, props) in self.SMART_FOLDERS.items() } for folder in self._fetch_folders(): - # FIXME: Handle subfolders - if folder['recordName'] in ('----Root-Folder----', - '----Project-Root-Folder----') or \ - (folder['fields'].get('isDeleted') and - folder['fields']['isDeleted']['value']): + + # Skiping albums having null name, that can happen sometime + if "albumNameEnc" not in folder["fields"]: + continue + + # TODO: Handle subfolders # pylint: disable=fixme + if folder["recordName"] == "----Root-Folder----" or ( + folder["fields"].get("isDeleted") + and folder["fields"]["isDeleted"]["value"] + ): continue - folder_id = folder['recordName'] - folder_obj_type = \ + folder_id = folder["recordName"] + folder_obj_type = ( "CPLContainerRelationNotDeletedByAssetDate:%s" % folder_id + ) folder_name = base64.b64decode( - folder['fields']['albumNameEnc']['value']).decode('utf-8') - query_filter = [{ - "fieldName": "parentId", - "comparator": "EQUALS", - "fieldValue": { - "type": "STRING", - "value": folder_id + folder["fields"]["albumNameEnc"]["value"] + ).decode("utf-8") + query_filter = [ + { + "fieldName": "parentId", + "comparator": "EQUALS", + "fieldValue": {"type": "STRING", "value": folder_id}, } - }] - - album = PhotoAlbum(self.service, folder_name, - 'CPLContainerRelationLiveByAssetDate', - folder_obj_type, 'ASCENDING', query_filter, - zone_id=self.zone_id) + ] + + album = PhotoAlbum( + self, + folder_name, + "CPLContainerRelationLiveByAssetDate", + folder_obj_type, + "ASCENDING", + query_filter, + ) self._albums[folder_name] = album return self._albums def _fetch_folders(self): - url = ('%s/records/query?%s' % - (self.service._service_endpoint, urlencode(self.service.params))) - json_data = json.dumps({ - "query": {"recordType":"CPLAlbumByPositionLive"}, - "zoneID": self.zone_id, - }) - - request = self.service.session.post( - url, - data=json_data, - headers={'Content-type': 'text/plain'} + url = f"{self.service_endpoint}/records/query?{urlencode(self.params)}" + json_data = ( + '{"query":{"recordType":"CPLAlbumByPositionLive"},' + '"zoneID":{"zoneName":"PrimarySync"}}' + ) + + request = self.session.post( + url, data=json_data, headers={"Content-type": "text/plain"} ) response = request.json() - return response['records'] + return response["records"] @property def all(self): - return self.albums['All Photos'] - - -class PhotosService(PhotoLibrary): - """The 'Photos' iCloud service. - - This also acts as a way to access the user's primary library. - """ - def __init__(self, service_root, session, params): - self.session = session - self.params = dict(params) - self._service_root = service_root - self._service_endpoint = \ - ('%s/database/1/com.apple.photos.cloud/production/private' - % self._service_root) - - self._libraries = None - - self.params.update({ - 'remapEnums': True, - 'getCurrentSyncToken': True - }) - - # TODO: Does syncToken ever change? - # self.params.update({ - # 'syncToken': response['syncToken'], - # 'clientInstanceId': self.params.pop('clientId') - # }) - - self._photo_assets = {} - - super(PhotosService, self).__init__( - service=self, zone_id={u'zoneName': u'PrimarySync'}) - - @property - def libraries(self): - if not self._libraries: - try: - url = ('%s/zones/list' % - (self._service_endpoint, )) - request = self.session.post( - url, - data='{}', - headers={'Content-type': 'text/plain'} - ) - response = request.json() - zones = response['zones'] - except Exception as e: - logger.error("library exception: %s" % str(e)) - - libraries = {} - for zone in zones: - if not zone.get('deleted'): - zone_name = zone['zoneID']['zoneName'] - libraries[zone_name] = PhotoLibrary( - self, zone_id=zone['zoneID']) - # obj_type='CPLAssetByAssetDateWithoutHiddenOrDeleted', - # list_type="CPLAssetAndMasterByAssetDateWithoutHiddenOrDeleted", - # direction="ASCENDING", query_filter=None, - # zone_id=zone['zoneID']) - - self._libraries = libraries - - return self._libraries - - -class PhotoAlbum(object): - - def __init__(self, service, name, list_type, obj_type, direction, - query_filter=None, page_size=100, zone_id=None): + """Returns all photos.""" + return self.albums["All Photos"] + + +class PhotoAlbum: + """A photo album.""" + + def __init__( + self, + service, + name, + list_type, + obj_type, + direction, + query_filter=None, + page_size=100, + ): self.name = name self.service = service self.list_type = list_type @@ -300,13 +250,9 @@ def __init__(self, service, name, list_type, obj_type, direction, self._len = None - if zone_id: - self._zone_id = zone_id - else: - self._zone_id = {u'zoneName': u'PrimarySync'} - @property def title(self): + """Gets the album name.""" return self.name def __iter__(self): @@ -314,79 +260,74 @@ def __iter__(self): def __len__(self): if self._len is None: - url = ('%s/internal/records/query/batch?%s' % - (self.service._service_endpoint, - urlencode(self.service.params))) + url = "{}/internal/records/query/batch?{}".format( + self.service.service_endpoint, + urlencode(self.service.params), + ) request = self.service.session.post( url, - data=json.dumps(self._count_query_gen(self.obj_type)), - headers={'Content-type': 'text/plain'} + data=json.dumps( + { + "batch": [ + { + "resultsLimit": 1, + "query": { + "filterBy": { + "fieldName": "indexCountID", + "fieldValue": { + "type": "STRING_LIST", + "value": [self.obj_type], + }, + "comparator": "IN", + }, + "recordType": "HyperionIndexCountLookup", + }, + "zoneWide": True, + "zoneID": {"zoneName": "PrimarySync"}, + } + ] + } + ), + headers={"Content-type": "text/plain"}, ) response = request.json() - self._len = (response["batch"][0]["records"][0]["fields"] - ["itemCount"]["value"]) + self._len = response["batch"][0]["records"][0]["fields"]["itemCount"][ + "value" + ] return self._len - # Perform the request in a separate method so that we - # can mock it to test session errors. - def photos_request(self, offset): - url = ('%s/records/query?' % self.service._service_endpoint) + \ - urlencode(self.service.params) - return self.service.session.post( - url, - data=json.dumps(self._list_query_gen( - offset, self.list_type, self.direction, - self.query_filter)), - headers={'Content-type': 'text/plain'} - ) - - @property def photos(self): + """Returns the album photos.""" if self.direction == "DESCENDING": offset = len(self) - 1 else: offset = 0 - exception_retries = 0 - - while(True): - try: - request = self.photos_request(offset) - except PyiCloudAPIResponseError as ex: - if self.exception_handler: - exception_retries += 1 - self.exception_handler(ex, exception_retries) - if exception_retries > 5: - raise - continue - else: - raise - - exception_retries = 0 - -# url = ('%s/records/query?' % self.service._service_endpoint) + \ -# urlencode(self.service.params) -# request = self.service.session.post( -# url, -# data=json.dumps(self._list_query_gen( -# offset, self.list_type, self.direction, -# self.query_filter)), -# headers={'Content-type': 'text/plain'} -# ) - + while True: + url = ("%s/records/query?" % self.service.service_endpoint) + urlencode( + self.service.params + ) + request = self.service.session.post( + url, + data=json.dumps( + self._list_query_gen( + offset, self.list_type, self.direction, self.query_filter + ) + ), + headers={"Content-type": "text/plain"}, + ) response = request.json() asset_records = {} master_records = [] - for rec in response['records']: - if rec['recordType'] == "CPLAsset": - master_id = \ - rec['fields']['masterRef']['value']['recordName'] + for rec in response["records"]: + if rec["recordType"] == "CPLAsset": + master_id = rec["fields"]["masterRef"]["value"]["recordName"] asset_records[master_id] = rec - elif rec['recordType'] == "CPLMaster": + elif rec["recordType"] == "CPLMaster": master_records.append(rec) master_records_len = len(master_records) @@ -397,117 +338,148 @@ def photos(self): offset = offset + master_records_len for master_record in master_records: - record_name = master_record['recordName'] - yield PhotoAsset(self.service, master_record, - asset_records[record_name]) + record_name = master_record["recordName"] + yield PhotoAsset( + self.service, master_record, asset_records[record_name] + ) else: break - def _count_query_gen(self, obj_type): - query = { - u'batch': [{ - u'resultsLimit': 1, - u'query': { - u'filterBy': { - u'fieldName': u'indexCountID', - u'fieldValue': { - u'type': u'STRING_LIST', - u'value': [ - obj_type - ] - }, - u'comparator': u'IN' - }, - u'recordType': u'HyperionIndexCountLookup' - }, - u'zoneWide': True, - u'zoneID': self._zone_id - }] - } - - return query - def _list_query_gen(self, offset, list_type, direction, query_filter=None): query = { - u'query': { - u'filterBy': [ - {u'fieldName': u'startRank', u'fieldValue': - {u'type': u'INT64', u'value': offset}, - u'comparator': u'EQUALS'}, - {u'fieldName': u'direction', u'fieldValue': - {u'type': u'STRING', u'value': direction}, - u'comparator': u'EQUALS'} + "query": { + "filterBy": [ + { + "fieldName": "startRank", + "fieldValue": {"type": "INT64", "value": offset}, + "comparator": "EQUALS", + }, + { + "fieldName": "direction", + "fieldValue": {"type": "STRING", "value": direction}, + "comparator": "EQUALS", + }, ], - u'recordType': list_type + "recordType": list_type, }, - u'resultsLimit': self.page_size * 2, - u'desiredKeys': [ - u'resJPEGFullWidth', u'resJPEGFullHeight', - u'resJPEGFullFileType', u'resJPEGFullFingerprint', - u'resJPEGFullRes', u'resJPEGLargeWidth', - u'resJPEGLargeHeight', u'resJPEGLargeFileType', - u'resJPEGLargeFingerprint', u'resJPEGLargeRes', - u'resJPEGMedWidth', u'resJPEGMedHeight', - u'resJPEGMedFileType', u'resJPEGMedFingerprint', - u'resJPEGMedRes', u'resJPEGThumbWidth', - u'resJPEGThumbHeight', u'resJPEGThumbFileType', - u'resJPEGThumbFingerprint', u'resJPEGThumbRes', - u'resVidFullWidth', u'resVidFullHeight', - u'resVidFullFileType', u'resVidFullFingerprint', - u'resVidFullRes', u'resVidMedWidth', u'resVidMedHeight', - u'resVidMedFileType', u'resVidMedFingerprint', - u'resVidMedRes', u'resVidSmallWidth', u'resVidSmallHeight', - u'resVidSmallFileType', u'resVidSmallFingerprint', - u'resVidSmallRes', u'resSidecarWidth', u'resSidecarHeight', - u'resSidecarFileType', u'resSidecarFingerprint', - u'resSidecarRes', u'itemType', u'dataClassType', - u'filenameEnc', u'originalOrientation', u'resOriginalWidth', - u'resOriginalHeight', u'resOriginalFileType', - u'resOriginalFingerprint', u'resOriginalRes', - u'resOriginalAltWidth', u'resOriginalAltHeight', - u'resOriginalAltFileType', u'resOriginalAltFingerprint', - u'resOriginalAltRes', u'resOriginalVidComplWidth', - u'resOriginalVidComplHeight', u'resOriginalVidComplFileType', - u'resOriginalVidComplFingerprint', u'resOriginalVidComplRes', - u'isDeleted', u'isExpunged', u'dateExpunged', u'remappedRef', - u'recordName', u'recordType', u'recordChangeTag', - u'masterRef', u'adjustmentRenderType', u'assetDate', - u'addedDate', u'isFavorite', u'isHidden', u'orientation', - u'duration', u'assetSubtype', u'assetSubtypeV2', - u'assetHDRType', u'burstFlags', u'burstFlagsExt', u'burstId', - u'captionEnc', u'locationEnc', u'locationV2Enc', - u'locationLatitude', u'locationLongitude', u'adjustmentType', - u'timeZoneOffset', u'vidComplDurValue', u'vidComplDurScale', - u'vidComplDispValue', u'vidComplDispScale', - u'vidComplVisibilityState', u'customRenderedValue', - u'containerId', u'itemId', u'position', u'isKeyAsset' + "resultsLimit": self.page_size * 2, + "desiredKeys": [ + "resJPEGFullWidth", + "resJPEGFullHeight", + "resJPEGFullFileType", + "resJPEGFullFingerprint", + "resJPEGFullRes", + "resJPEGLargeWidth", + "resJPEGLargeHeight", + "resJPEGLargeFileType", + "resJPEGLargeFingerprint", + "resJPEGLargeRes", + "resJPEGMedWidth", + "resJPEGMedHeight", + "resJPEGMedFileType", + "resJPEGMedFingerprint", + "resJPEGMedRes", + "resJPEGThumbWidth", + "resJPEGThumbHeight", + "resJPEGThumbFileType", + "resJPEGThumbFingerprint", + "resJPEGThumbRes", + "resVidFullWidth", + "resVidFullHeight", + "resVidFullFileType", + "resVidFullFingerprint", + "resVidFullRes", + "resVidMedWidth", + "resVidMedHeight", + "resVidMedFileType", + "resVidMedFingerprint", + "resVidMedRes", + "resVidSmallWidth", + "resVidSmallHeight", + "resVidSmallFileType", + "resVidSmallFingerprint", + "resVidSmallRes", + "resSidecarWidth", + "resSidecarHeight", + "resSidecarFileType", + "resSidecarFingerprint", + "resSidecarRes", + "itemType", + "dataClassType", + "filenameEnc", + "originalOrientation", + "resOriginalWidth", + "resOriginalHeight", + "resOriginalFileType", + "resOriginalFingerprint", + "resOriginalRes", + "resOriginalAltWidth", + "resOriginalAltHeight", + "resOriginalAltFileType", + "resOriginalAltFingerprint", + "resOriginalAltRes", + "resOriginalVidComplWidth", + "resOriginalVidComplHeight", + "resOriginalVidComplFileType", + "resOriginalVidComplFingerprint", + "resOriginalVidComplRes", + "isDeleted", + "isExpunged", + "dateExpunged", + "remappedRef", + "recordName", + "recordType", + "recordChangeTag", + "masterRef", + "adjustmentRenderType", + "assetDate", + "addedDate", + "isFavorite", + "isHidden", + "orientation", + "duration", + "assetSubtype", + "assetSubtypeV2", + "assetHDRType", + "burstFlags", + "burstFlagsExt", + "burstId", + "captionEnc", + "locationEnc", + "locationV2Enc", + "locationLatitude", + "locationLongitude", + "adjustmentType", + "timeZoneOffset", + "vidComplDurValue", + "vidComplDurScale", + "vidComplDispValue", + "vidComplDispScale", + "vidComplVisibilityState", + "customRenderedValue", + "containerId", + "itemId", + "position", + "isKeyAsset", ], - u'zoneID': self._zone_id + "zoneID": {"zoneName": "PrimarySync"}, } if query_filter: - query['query']['filterBy'].extend(query_filter) + query["query"]["filterBy"].extend(query_filter) return query - def __unicode__(self): - return self.title - def __str__(self): - as_unicode = self.__unicode__() - if sys.version_info[0] >= 3: - return as_unicode - else: - return as_unicode.encode('ascii', 'ignore') + return self.title def __repr__(self): - return "<%s: '%s'>" % ( - type(self).__name__, - self - ) + return f"<{type(self).__name__}: '{self}'>" -class PhotoAsset(object): +class PhotoAsset: + """A photo.""" + def __init__(self, service, master_record, asset_record): self._service = service self._master_record = master_record @@ -515,172 +487,152 @@ def __init__(self, service, master_record, asset_record): self._versions = None - ITEM_TYPES = { - u"public.heic": u"image", - u"public.jpeg": u"image", - u"public.png": u"image", - u"com.apple.quicktime-movie": u"movie" - } - - ITEM_TYPE_EXTENSIONS = { - u"public.heic": u"HEIC", - u"public.jpeg": u"JPG", - u"public.png": u"PNG", - u"com.apple.quicktime-movie": u"MOV" - } - PHOTO_VERSION_LOOKUP = { - u"original": u"resOriginal", - u"medium": u"resJPEGMed", - u"thumb": u"resJPEGThumb", - u"originalVideo": u"resOriginalVidCompl", - u"mediumVideo": u"resVidMed", - u"thumbVideo": u"resVidSmall", + "original": "resOriginal", + "medium": "resJPEGMed", + "thumb": "resJPEGThumb", } VIDEO_VERSION_LOOKUP = { - u"original": u"resOriginal", - u"medium": u"resVidMed", - u"thumb": u"resVidSmall" + "original": "resOriginal", + "medium": "resVidMed", + "thumb": "resVidSmall", } @property def id(self): - return self._master_record['recordName'] + """Gets the photo id.""" + return self._master_record["recordName"] @property def filename(self): - fields = self._master_record['fields'] - if 'filenameEnc' in fields and 'value' in fields['filenameEnc']: - return base64.b64decode( - fields['filenameEnc']['value'] - ).decode('utf-8') - - # Some photos don't have a filename. - # In that case, just use the truncated fingerprint (hash), - # plus the correct extension. - filename = re.sub('[^0-9a-zA-Z]', '_', self.id)[0:12] - return '.'.join([filename, self.item_type_extension]) + """Gets the photo file name.""" + return base64.b64decode( + self._master_record["fields"]["filenameEnc"]["value"] + ).decode("utf-8") @property def size(self): - return self._master_record['fields']['resOriginalRes']['value']['size'] + """Gets the photo size.""" + return self._master_record["fields"]["resOriginalRes"]["value"]["size"] @property def created(self): + """Gets the photo created date.""" return self.asset_date @property def asset_date(self): + """Gets the photo asset date.""" try: - dt = datetime.fromtimestamp( - self._asset_record['fields']['assetDate']['value'] / 1000.0, - tz=pytz.utc) - except: - dt = datetime.fromtimestamp(0) - return dt + return datetime.utcfromtimestamp( + self._asset_record["fields"]["assetDate"]["value"] / 1000.0 + ).replace(tzinfo=timezone.utc) + except KeyError: + return datetime.utcfromtimestamp(0).replace(tzinfo=timezone.utc) @property def added_date(self): - dt = datetime.fromtimestamp( - self._asset_record['fields']['addedDate']['value'] / 1000.0, - tz=pytz.utc) - return dt + """Gets the photo added date.""" + return datetime.utcfromtimestamp( + self._asset_record["fields"]["addedDate"]["value"] / 1000.0 + ).replace(tzinfo=timezone.utc) @property def dimensions(self): - return (self._master_record['fields']['resOriginalWidth']['value'], - self._master_record['fields']['resOriginalHeight']['value']) - - @property - def item_type(self): - fields = self._master_record['fields'] - if 'itemType' not in fields or 'value' not in fields['itemType']: - return 'unknown' - item_type = self._master_record['fields']['itemType']['value'] - if item_type in self.ITEM_TYPES: - return self.ITEM_TYPES[item_type] - if self.filename.lower().endswith(('.heic', '.png', '.jpg', '.jpeg')): - return 'image' - return 'movie' - - @property - def item_type_extension(self): - fields = self._master_record['fields'] - if 'itemType' not in fields or 'value' not in fields['itemType']: - return 'unknown' - item_type = self._master_record['fields']['itemType']['value'] - if item_type in self.ITEM_TYPE_EXTENSIONS: - return self.ITEM_TYPE_EXTENSIONS[item_type] - return 'unknown' + """Gets the photo dimensions.""" + return ( + self._master_record["fields"]["resOriginalWidth"]["value"], + self._master_record["fields"]["resOriginalHeight"]["value"], + ) @property def versions(self): + """Gets the photo versions.""" if not self._versions: self._versions = {} - if self.item_type == "movie": + if "resVidSmallRes" in self._master_record["fields"]: typed_version_lookup = self.VIDEO_VERSION_LOOKUP else: typed_version_lookup = self.PHOTO_VERSION_LOOKUP for key, prefix in typed_version_lookup.items(): - if '%sRes' % prefix in self._master_record['fields']: - f = self._master_record['fields'] - filename = self.filename - version = {'filename': filename} + if "%sRes" % prefix in self._master_record["fields"]: + fields = self._master_record["fields"] + version = {"filename": self.filename} - width_entry = f.get('%sWidth' % prefix) + width_entry = fields.get("%sWidth" % prefix) if width_entry: - version['width'] = width_entry['value'] + version["width"] = width_entry["value"] else: - version['width'] = None + version["width"] = None - height_entry = f.get('%sHeight' % prefix) + height_entry = fields.get("%sHeight" % prefix) if height_entry: - version['height'] = height_entry['value'] + version["height"] = height_entry["value"] else: - version['height'] = None + version["height"] = None - size_entry = f.get('%sRes' % prefix) + size_entry = fields.get("%sRes" % prefix) if size_entry: - version['size'] = size_entry['value']['size'] - version['url'] = size_entry['value']['downloadURL'] + version["size"] = size_entry["value"]["size"] + version["url"] = size_entry["value"]["downloadURL"] else: - version['size'] = None - version['url'] = None + version["size"] = None + version["url"] = None - type_entry = f.get('%sFileType' % prefix) + type_entry = fields.get("%sFileType" % prefix) if type_entry: - version['type'] = type_entry['value'] + version["type"] = type_entry["value"] else: - version['type'] = None - - # Change live photo movie file extension to .MOV - if (self.item_type == "image" and - version['type'] == "com.apple.quicktime-movie"): - if filename.lower().endswith('.heic'): - version['filename']=re.sub( - '\.[^.]+$', '_HEVC.MOV', version['filename']) - else: - version['filename'] = re.sub( - '\.[^.]+$', '.MOV', version['filename']) + version["type"] = None self._versions[key] = version return self._versions - def download(self, version='original', **kwargs): + def download(self, version="original", **kwargs): + """Returns the photo file.""" if version not in self.versions: return None return self._service.session.get( - self.versions[version]['url'], - stream=True, - **kwargs + self.versions[version]["url"], stream=True, **kwargs ) - def __repr__(self): - return "<%s: id=%s>" % ( - type(self).__name__, - self.id + def delete(self): + """Deletes the photo.""" + json_data = ( + '{"query":{"recordType":"CheckIndexingState"},' + '"zoneID":{"zoneName":"PrimarySync"}}' + ) + + json_data = ( + '{"operations":[{' + '"operationType":"update",' + '"record":{' + '"recordName":"%s",' + '"recordType":"%s",' + '"recordChangeTag":"%s",' + '"fields":{"isDeleted":{"value":1}' + "}}}]," + '"zoneID":{' + '"zoneName":"PrimarySync"' + '},"atomic":true}' + % ( + self._asset_record["recordName"], + self._asset_record["recordType"], + self._master_record["recordChangeTag"], + ) ) + + endpoint = self._service.service_endpoint + params = urlencode(self._service.params) + url = f"{endpoint}/records/modify?{params}" + + return self._service.session.post( + url, data=json_data, headers={"Content-type": "text/plain"} + ) + + def __repr__(self): + return f"<{type(self).__name__}: id={self.id}>" diff --git a/src/pyicloud_ipd/services/reminders.py b/src/pyicloud_ipd/services/reminders.py index 12e8ba23e..949586b33 100644 --- a/src/pyicloud_ipd/services/reminders.py +++ b/src/pyicloud_ipd/services/reminders.py @@ -1,121 +1,123 @@ -from __future__ import absolute_import -from datetime import datetime, timedelta +"""Reminders service.""" +from datetime import datetime import time import uuid import json -from tzlocal import get_localzone +from tzlocal import get_localzone_name -class RemindersService(object): +class RemindersService: + """The 'Reminders' iCloud service.""" + def __init__(self, service_root, session, params): self.session = session - self.params = params + self._params = params self._service_root = service_root + self.lists = {} self.collections = {} self.refresh() def refresh(self): - params_reminders = dict(self.params) - params_reminders.update({ - 'clientVersion': '4.0', - 'lang': 'en-us', - 'usertz': get_localzone().zone - }) + """Refresh data.""" + params_reminders = dict(self._params) + params_reminders.update( + {"clientVersion": "4.0", "lang": "en-us", "usertz": get_localzone_name()} + ) # Open reminders req = self.session.get( - self._service_root + '/rd/startup', - params=params_reminders + self._service_root + "/rd/startup", params=params_reminders ) - startup = req.json() + data = req.json() self.lists = {} self.collections = {} - for collection in startup['Collections']: + for collection in data["Collections"]: temp = [] - self.collections[collection['title']] = { - 'guid': collection['guid'], - 'ctag': collection['ctag'] + self.collections[collection["title"]] = { + "guid": collection["guid"], + "ctag": collection["ctag"], } - for reminder in startup['Reminders']: + for reminder in data["Reminders"]: - if reminder['pGuid'] != collection['guid']: + if reminder["pGuid"] != collection["guid"]: continue - if 'dueDate' in reminder: - if reminder['dueDate']: - due = datetime( - reminder['dueDate'][1], - reminder['dueDate'][2], reminder['dueDate'][3], - reminder['dueDate'][4], reminder['dueDate'][5] - ) - else: - due = None + + if reminder.get("dueDate"): + due = datetime( + reminder["dueDate"][1], + reminder["dueDate"][2], + reminder["dueDate"][3], + reminder["dueDate"][4], + reminder["dueDate"][5], + ) else: due = None - if reminder['description']: - desc = reminder['description'] - else: - desc = "" - temp.append({ - "title": reminder['title'], - "desc": desc, - "due": due - }) - self.lists[collection['title']] = temp - - def post(self, title, description="", collection=None, dueDate=None): - pguid = 'tasks' + + temp.append( + { + "title": reminder["title"], + "desc": reminder.get("description"), + "due": due, + } + ) + self.lists[collection["title"]] = temp + + def post(self, title, description="", collection=None, due_date=None): + """Adds a new reminder.""" + pguid = "tasks" if collection: if collection in self.collections: - pguid = self.collections[collection]['guid'] - - params_reminders = dict(self.params) - params_reminders.update({ - 'clientVersion': '4.0', - 'lang': 'en-us', - 'usertz': get_localzone().zone - }) - - dueDateList = None - if dueDate: - dueDateList = [ - int(str(dueDate.year) + str(dueDate.month) + str(dueDate.day)), - dueDate.year, - dueDate.month, - dueDate.day, - dueDate.hour, - dueDate.minute + pguid = self.collections[collection]["guid"] + + params_reminders = dict(self._params) + params_reminders.update( + {"clientVersion": "4.0", "lang": "en-us", "usertz": get_localzone_name()} + ) + + due_dates = None + if due_date: + due_dates = [ + int(str(due_date.year) + str(due_date.month) + str(due_date.day)), + due_date.year, + due_date.month, + due_date.day, + due_date.hour, + due_date.minute, ] req = self.session.post( - self._service_root + '/rd/reminders/tasks', - data=json.dumps({ - "Reminders": { - 'title': title, - "description": description, - "pGuid": pguid, - "etag": None, - "order": None, - "priority": 0, - "recurrence": None, - "alarms": [], - "startDate": None, - "startDateTz": None, - "startDateIsAllDay": False, - "completedDate": None, - "dueDate": dueDateList, - "dueDateIsAllDay": False, - "lastModifiedDate": None, - "createdDate": None, - "isFamily": None, - "createdDateExtended": int(time.time()*1000), - "guid": str(uuid.uuid4()) - }, - "ClientState": {"Collections": list(self.collections.values())} - }), - params=params_reminders) + self._service_root + "/rd/reminders/tasks", + data=json.dumps( + { + "Reminders": { + "title": title, + "description": description, + "pGuid": pguid, + "etag": None, + "order": None, + "priority": 0, + "recurrence": None, + "alarms": [], + "startDate": None, + "startDateTz": None, + "startDateIsAllDay": False, + "completedDate": None, + "dueDate": due_dates, + "dueDateIsAllDay": False, + "lastModifiedDate": None, + "createdDate": None, + "isFamily": None, + "createdDateExtended": int(time.time() * 1000), + "guid": str(uuid.uuid4()), + }, + "ClientState": {"Collections": list(self.collections.values())}, + } + ), + params=params_reminders, + ) return req.ok diff --git a/src/pyicloud_ipd/services/ubiquity.py b/src/pyicloud_ipd/services/ubiquity.py index 30c7ec598..f45a78bc9 100644 --- a/src/pyicloud_ipd/services/ubiquity.py +++ b/src/pyicloud_ipd/services/ubiquity.py @@ -1,48 +1,42 @@ +"""File service.""" from datetime import datetime -import sys -class UbiquityService(object): - """ The 'Ubiquity' iCloud service.""" +class UbiquityService: + """The 'Ubiquity' iCloud service.""" def __init__(self, service_root, session, params): self.session = session self.params = params + self._root = None + self._node_url = service_root + "/ws/%s/%s/%s" - self._service_root = service_root - self._node_url = '/ws/%s/%s/%s' + @property + def root(self): + """Gets the root node.""" + if not self._root: + self._root = self.get_node(0) + return self._root - def get_node_url(self, id, variant='item'): - return self._service_root + self._node_url % ( - self.params['dsid'], - variant, - id - ) + def get_node_url(self, node_id, variant="item"): + """Returns a node URL.""" + return self._node_url % (self.params["dsid"], variant, node_id) - def get_node(self, id): - request = self.session.get(self.get_node_url(id)) + def get_node(self, node_id): + """Returns a node.""" + request = self.session.get(self.get_node_url(node_id)) return UbiquityNode(self, request.json()) - def get_children(self, id): - request = self.session.get( - self.get_node_url(id, 'parent') - ) - items = request.json()['item_list'] + def get_children(self, node_id): + """Returns a node children.""" + request = self.session.get(self.get_node_url(node_id, "parent")) + items = request.json()["item_list"] return [UbiquityNode(self, item) for item in items] - def get_file(self, id, **kwargs): - request = self.session.get( - self.get_node_url(id, 'file'), - **kwargs - ) - return request - - @property - def root(self): - if not self._root: - self._root = self.get_node(0) - return self._root + def get_file(self, node_id, **kwargs): + """Returns a node file.""" + return self.session.get(self.get_node_url(node_id, "file"), **kwargs) def __getattr__(self, attr): return getattr(self.root, attr) @@ -51,71 +45,69 @@ def __getitem__(self, key): return self.root[key] -class UbiquityNode(object): +class UbiquityNode: + """Ubiquity node.""" + def __init__(self, conn, data): self.data = data self.connection = conn + self._children = None + @property def item_id(self): - return self.data.get('item_id') + """Gets the node id.""" + return self.data.get("item_id") @property def name(self): - return self.data.get('name') + """Gets the node name.""" + return self.data.get("name") @property def type(self): - return self.data.get('type') - - def get_children(self): - if not hasattr(self, '_children'): - self._children = self.connection.get_children(self.item_id) - return self._children + """Gets the node type.""" + return self.data.get("type") @property def size(self): + """Gets the node size.""" try: - return int(self.data.get('size')) + return int(self.data.get("size")) except ValueError: return None @property def modified(self): - return datetime.strptime( - self.data.get('modified'), - '%Y-%m-%dT%H:%M:%SZ' - ) - - def dir(self): - return [child.name for child in self.get_children()] + """Gets the node modified date.""" + return datetime.strptime(self.data.get("modified"), "%Y-%m-%dT%H:%M:%SZ") def open(self, **kwargs): + """Returns the node file.""" return self.connection.get_file(self.item_id, **kwargs) + def get_children(self): + """Returns the node children.""" + if not self._children: + self._children = self.connection.get_children(self.item_id) + return self._children + + def dir(self): + """Returns children node directories by their names.""" + return [child.name for child in self.get_children()] + def get(self, name): - return [ - child for child in self.get_children() if child.name == name - ][0] + """Returns a child node by its name.""" + return [child for child in self.get_children() if child.name == name][0] def __getitem__(self, key): try: return self.get(key) - except IndexError: - raise KeyError('No child named %s exists' % key) - - def __unicode__(self): - return self.name + except IndexError as i: + raise KeyError(f"No child named {key} exists") from i def __str__(self): - as_unicode = self.__unicode__() - if sys.version_info[0] >= 3: - return as_unicode - else: - return as_unicode.encode('ascii', 'ignore') + return self.name def __repr__(self): - return "<%s: '%s'>" % ( - self.type.capitalize(), - self - ) + return f"<{self.type.capitalize()}: '{self}'>" diff --git a/src/pyicloud_ipd/utils.py b/src/pyicloud_ipd/utils.py index 378b52ca9..796de45bc 100644 --- a/src/pyicloud_ipd/utils.py +++ b/src/pyicloud_ipd/utils.py @@ -1,43 +1,44 @@ +"""Utils.""" import getpass import keyring import sys -from .exceptions import NoStoredPasswordAvailable +from .exceptions import PyiCloudNoStoredPasswordAvailableException -KEYRING_SYSTEM = 'pyicloud://icloud-password' +KEYRING_SYSTEM = "pyicloud://icloud-password" def get_password(username, interactive=sys.stdout.isatty()): + """Get the password from a username.""" try: return get_password_from_keyring(username) - except NoStoredPasswordAvailable: + except PyiCloudNoStoredPasswordAvailableException: if not interactive: raise return getpass.getpass( - 'Enter iCloud password for {username}: '.format( + "Enter iCloud password for {username}: ".format( username=username, ) ) def password_exists_in_keyring(username): + """Return true if the password of a username exists in the keyring.""" try: get_password_from_keyring(username) - except NoStoredPasswordAvailable: + except PyiCloudNoStoredPasswordAvailableException: return False return True def get_password_from_keyring(username): - result = keyring.get_password( - KEYRING_SYSTEM, - username - ) + """Get the password from a username.""" + result = keyring.get_password(KEYRING_SYSTEM, username) if result is None: - raise NoStoredPasswordAvailable( + raise PyiCloudNoStoredPasswordAvailableException( "No pyicloud password for {username} could be found " "in the system keychain. Use the `--store-in-keyring` " "command-line option for storing a password for this " @@ -50,6 +51,7 @@ def get_password_from_keyring(username): def store_password_in_keyring(username, password): + """Store the password of a username.""" return keyring.set_password( KEYRING_SYSTEM, username, @@ -58,6 +60,7 @@ def store_password_in_keyring(username, password): def delete_password_in_keyring(username): + """Delete the password of a username.""" return keyring.delete_password( KEYRING_SYSTEM, username, @@ -65,8 +68,9 @@ def delete_password_in_keyring(username): def underscore_to_camelcase(word, initial_capital=False): - words = [x.capitalize() or '_' for x in word.split('_')] + """Transform a word to camelCase.""" + words = [x.capitalize() or "_" for x in word.split("_")] if not initial_capital: words[0] = words[0].lower() - return ''.join(words) + return "".join(words)