import logging import json import mimetypes import os import piexif import re import requests import shutil import sys import time from datetime import datetime from pyicloud import PyiCloudService from pyicloud.exceptions import PyiCloudFailedLoginException, PyiCloudAPIResponseException OVERWRITE = False IMMICH_API_KEY = os.getenv("IMMICH_API_KEY") NAS_IMMICH_LIBRARY_ID = os.getenv("NAS_IMMICH_LIBRARY_ID") program_start_time = datetime.now().strftime("%Y-%m-%d_%H-%M-%S") log_file = f"/var/log/icloud_backup_{program_start_time}.log" logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s', handlers=[ logging.FileHandler(log_file), logging.StreamHandler(sys.stdout) ] ) def authenticate_icloud(username, password, cookie_dir): try: api = PyiCloudService(username, password, cookie_directory=cookie_dir) except PyiCloudFailedLoginException as e: logging.error("Failed to authenticate to iCloud: %s", e) sys.exit(1) if api.requires_2fa: logging.info("Two factor auth required.") code = input("Enter the 2fa code: ") if not api.validate_2fa_code(code): logging.error("Invalid 2fa code.") sys.exit(1) if api.requires_2sa: logging.info("Two step authentication enabled") security_code = input("Enter the security code: ") if not api.validate_2sa_code(security_code): logging.error("Invalid security code.") sys.exit(1) return api def refresh_icloud_session(api): try: api.refresh() logging.info("iCloud session refreshed successfully") except Exception as e: logging.error(f"Failed to refresh iCloud session: {e}") raise e def ensure_unique_filename(photo, nas_dir): """ Ensures the photo on the NAS has a unique filename based on the photo's ID. If a file with the same name already exists, it is renamed Returns the updated file path (using the ID) """ # Original file storage pattern original_name = photo.filename base_name, ext = os.path.splitext(original_name) nas_photo_path = os.path.join(nas_dir, original_name) # New file storage pattern sanitized_id = re.sub(r'[^a-zA-Z0-9_-]', '', str(photo.id)) name_with_id = f"{base_name}_{sanitized_id}{ext}" nas_photo_path_with_id = os.path.join(nas_dir, name_with_id) if os.path.exists(nas_photo_path): # Rename the existing file to include the photo ID try: os.rename(nas_photo_path, nas_photo_path_with_id) logging.info(f"Renamed existing file {nas_photo_path} to {nas_photo_path_with_id}") except Exception as e: logging.error(f"Failed to rename file {nas_photo_path}") return nas_photo_path_with_id def handle_media_download(photo, temp_dir, nas_dir, album_names): photo_name = photo.filename temp_path = os.path.join(temp_dir, photo_name) nas_photo_path = ensure_unique_filename(photo, nas_dir) added_photo = False if not OVERWRITE and os.path.exists(nas_photo_path): logging.debug("Photo %s already exists on NAS. Skipping", photo_name) return added_photo else: try: download_media(photo, temp_path) handle_live_photo(photo, temp_dir, nas_dir) copy_media(photo, temp_path, nas_photo_path, temp_path) logging.info(f"Successfully saved photo {photo.id} to NAS at {nas_photo_path}\n") added_photo = True except Exception as e: logging.error("Failed to process photo %s: %s", photo_name, e) raise e # Create symbolic links in each album directory for album_name in album_names: album_dir = os.path.join(nas_dir, album_name) os.makedirs(album_dir, exist_ok=True) album_photo_path = os.path.join(album_dir, photo_name) if not os.path.exists(album_photo_path): try: os.symlink(nas_photo_path, album_photo_path) logging.debug("Created symlink for photo %s in album %s", photo_name, album_name) except FileExistsError: logging.warning("Symlink for photo %s in album %s already exists", photo_name, album_name) except Exception as e: logging.error("Failed to create symlink for photo %s in album %s: %s", photo_name, album_name, e) return added_photo def download_media(media, temp_path, *download_args): logging.debug("Downloading media: %s", media.filename) with open(temp_path, 'wb') as f: for chunk in media.download(*download_args).iter_content(chunk_size=1024): if chunk: f.write(chunk) logging.debug("Downloaded media: %s", media.filename) ensure_metadata(media, temp_path) def ensure_metadata(media, file_path): try: mime_type, _ = mimetypes.guess_type(file_path) if mime_type not in ["image/jpeg", "image/tiff"]: logging.debug(f"Skipping metadata check for unsupported file type {mime_type} ({file_path})") return exif_dict = piexif.load(file_path) datetime_original = exif_dict.get("Exif", {}).get(piexif.ExifIFD.DateTimeOriginal) if not datetime_original: logging.debug(f"Missing datetime metadata for file {file_path}") datetime_value = media.created.strftime("%Y:%m:%d %H:%M:%S") exif_dict["Exif"][piexif.ExifIFD.DateTimeOriginal] = datetime_value exif_dict["Exif"][piexif.ExifIFD.DateTimeDigitized] = datetime_value exif_dict["0th"][piexif.ImageIFD.DateTime] = datetime_value exif_bytes = piexif.dump(exif_dict) piexif.insert(exif_bytes, file_path) logging.debug(f"Updated Exif metadata for file {file_path}\nDatetime: {datetime_value}") else: logging.debug(f"File already has datetime metadata {file_path}") except Exception as e: logging.error(f"Error ensuring metadata for file {file_path}: {e}") def copy_media(media, source_path, destination_path, temp_path): logging.debug("Copying media %s to NAS", media.filename) shutil.copy2(source_path, destination_path) logging.debug("Copied media %s to NAS", media.filename) os.remove(temp_path) logging.debug("Removed temp file: %s", temp_path) def handle_live_photo(photo, temp_dir, nas_dir): base_name, ext = os.path.splitext(photo.filename) logging.debug(f"Checking for live photo: {json.dumps(photo._versions, indent=4)}") if 'medium' in photo._versions and photo._versions['medium']['type'] == 'com.apple.quicktime-movie': logging.debug("This is a live photo") video_name = f"{base_name}.mov" temp_video_path = os.path.join(temp_dir, video_name) nas_video_path = os.path.join(nas_dir, video_name) if not os.path.exists(nas_video_path): logging.debug("Downloading Live Photo video component: %s", video_name) download_media(photo, temp_video_path, "medium") copy_media(photo, temp_video_path, nas_video_path, temp_video_path) def get_all_media(api, temp_dir, nas_dir): all_albums = api.photos.albums photo_album_map = {} unique_albums = set() for album in all_albums.values(): album_name = album.title unique_albums.add(album_name) # if album_name != "WhatsApp": # continue # if album_name != "Cars": # if album_name != "Videos": # continue logging.info("Categorizing photos in album: %s", album_name) for photo in album.photos: if photo not in photo_album_map: photo_album_map[photo] = [] photo_album_map[photo].append(album_name) logging.info("Found %d unique photos in iCloud", len(photo_album_map)) unique_albums = '\n'.join(unique_albums) logging.info(f"All album names: {unique_albums}") # Download each photo and handle its album associations i = 0 for photo, album_names in photo_album_map.items(): if i > 100: scan_library(NAS_IMMICH_LIBRARY_ID) logging.info("Sleeping while Immich processes new photos") time.sleep(600) logging.info("Resuming...") i = 0 added_photo = handle_media_download(photo, temp_dir, nas_dir, album_names) if added_photo: i += 1 return True def get_immich_libraries(): url = "http://immich_server:2283/api/libraries" payload = {} headers = { "Accept": "application/json", "x-api-key": IMMICH_API_KEY } response = requests.get(url, headers=headers, data=payload) logging.info(f"Libraries: {response.json()}") def scan_library(library_id): try: url = f"http://immich_server:2283/api/libraries/{library_id}/scan" payload = {} headers = { "Accept": "application/json", "x-api-key": IMMICH_API_KEY } logging.info("Triggering library scan...") response = requests.post(url, headers=headers, data=payload) response.raise_for_status() except requests.exceptions.HTTPError as e: logging.error(f"Attempt to trigger Immich library scan failed due to HTTP error: {e}") except Exception as e: logging.error(f"Attempt to trigger Immich library scan failed: {e}") logging.info("Successfully started library scan") return True def main(): icloud_username = os.getenv("ICLOUD_USERNAME") icloud_password = os.getenv("ICLOUD_PASSWORD") TEMP_DIR = os.getenv("TEMP_DIR", "/backup/temp") NAS_DIR = os.getenv("NAS_DIR", "/mnt/photos_backup/icloud") COOKIE_DIR = os.getenv("COOKIE_DIR", "/app/.pyicloud") if not NAS_IMMICH_LIBRARY_ID: logging.error("NAS_IMMICH_LIBRARY_ID must be set. Exiting...") sys.exit(1) if not IMMICH_API_KEY: logging.error("IMMICH_API_KEY must be set. Exiting...") sys.exit(1) if not icloud_username or not icloud_password: logging.error("icloud credentials not set. Please set ICLOUD_USERNAME and ICLOUD_PASSWORD environment variables.") sys.exit(1) # Create necessary directories if they don't exist os.makedirs(TEMP_DIR, exist_ok=True) os.makedirs(NAS_DIR, exist_ok=True) completed = False while not completed: try: api = authenticate_icloud(icloud_username, icloud_password, COOKIE_DIR) completed = get_all_media(api, TEMP_DIR, NAS_DIR) except PyiCloudAPIResponseException as e: if e.code == 410: logging.warning(f"Detected a gone error, restarting...") else: logging.error(f"Detected another HTTP error: {e}") time.sleep(60) except Exception as e: logging.error(f"Encountered an error: {e}") raise e logging.info(f"Completed backup without erroring") if __name__ == "__main__": main()