""" Storage abstraction layer for Sharey Supports multiple storage backends: local filesystem and Backblaze B2 """ import json import os import shutil from abc import ABC, abstractmethod from pathlib import Path from typing import Optional, BinaryIO from b2sdk.v2 import InMemoryAccountInfo, B2Api class StorageBackend(ABC): """Abstract base class for storage backends""" @abstractmethod def upload_file(self, file_content: bytes, file_path: str, content_type: str = None, metadata: Optional[dict] = None) -> bool: """Upload a file to storage with optional metadata""" pass @abstractmethod def download_file(self, file_path: str) -> bytes: """Download a file from storage""" pass @abstractmethod def delete_file(self, file_path: str) -> bool: """Delete a file from storage""" pass @abstractmethod def file_exists(self, file_path: str) -> bool: """Check if a file exists in storage""" pass @abstractmethod def get_file_size(self, file_path: str) -> Optional[int]: """Get file size in bytes""" pass @abstractmethod def list_files(self, prefix: str = "") -> list: """List files with optional prefix""" pass @abstractmethod def get_metadata(self, file_path: str) -> Optional[dict]: """Get file metadata""" pass class LocalStorageBackend(StorageBackend): """Local filesystem storage backend""" def __init__(self, base_path: str = "storage"): self.base_path = Path(base_path) self.base_path.mkdir(exist_ok=True) print(f"✅ Local storage initialized at: {self.base_path.absolute()}") def _get_full_path(self, file_path: str) -> Path: """Get full filesystem path for a file""" return self.base_path / file_path def upload_file(self, file_content: bytes, file_path: str, content_type: str = None, metadata: Optional[dict] = None) -> bool: """Upload a file to local storage with optional metadata""" try: full_path = self._get_full_path(file_path) full_path.parent.mkdir(parents=True, exist_ok=True) # Write the main file with open(full_path, 'wb') as f: f.write(file_content) # Write metadata to a .meta file if provided if metadata: import json meta_path = full_path.with_suffix(full_path.suffix + '.meta') with open(meta_path, 'w') as f: json.dump(metadata, f) print(f"📄 Metadata saved for: {file_path}") print(f"✅ Local upload successful: {file_path}") return True except Exception as e: print(f"❌ Local upload failed for {file_path}: {e}") return False def download_file(self, file_path: str) -> bytes: """Download a file from local storage""" try: full_path = self._get_full_path(file_path) with open(full_path, 'rb') as f: return f.read() except Exception as e: print(f"❌ Local download failed for {file_path}: {e}") raise FileNotFoundError(f"File not found: {file_path}") def delete_file(self, file_path: str) -> bool: """Delete a file from local storage""" try: full_path = self._get_full_path(file_path) if full_path.exists(): full_path.unlink() print(f"✅ Local file deleted: {file_path}") return True return False except Exception as e: print(f"❌ Local delete failed for {file_path}: {e}") return False def file_exists(self, file_path: str) -> bool: """Check if a file exists in local storage""" return self._get_full_path(file_path).exists() def get_file_size(self, file_path: str) -> Optional[int]: """Get file size in bytes""" try: full_path = self._get_full_path(file_path) if full_path.exists(): return full_path.stat().st_size return None except Exception: return None def list_files(self, prefix: str = "") -> list: """List files with optional prefix""" try: if prefix: search_path = self.base_path / prefix if search_path.is_dir(): return [str(p.relative_to(self.base_path)) for p in search_path.rglob("*") if p.is_file()] else: # Search for files matching prefix pattern parent = search_path.parent if parent.exists(): pattern = search_path.name + "*" return [str(p.relative_to(self.base_path)) for p in parent.glob(pattern) if p.is_file()] else: return [str(p.relative_to(self.base_path)) for p in self.base_path.rglob("*") if p.is_file()] return [] except Exception as e: print(f"❌ Local list failed: {e}") return [] def get_metadata(self, file_path: str) -> Optional[dict]: """Get file metadata from local storage""" try: meta_path = self._get_full_path(file_path + '.meta') if meta_path.exists(): with open(meta_path, 'r') as f: metadata = json.load(f) return metadata return None except Exception as e: print(f"❌ Local metadata read failed for {file_path}: {e}") return None class B2StorageBackend(StorageBackend): """Backblaze B2 cloud storage backend""" def __init__(self, key_id: str, key: str, bucket_name: str): self.key_id = key_id self.key = key self.bucket_name = bucket_name try: info = InMemoryAccountInfo() self.b2_api = B2Api(info) self.b2_api.authorize_account("production", key_id, key) self.bucket = self.b2_api.get_bucket_by_name(bucket_name) print(f"✅ B2 storage initialized with bucket: {bucket_name}") except Exception as e: print(f"❌ Failed to initialize B2 storage: {e}") raise def upload_file(self, file_content: bytes, file_path: str, content_type: str = None, metadata: Optional[dict] = None) -> bool: """Upload a file to B2 storage with optional metadata""" try: # Prepare file info (metadata) for B2 file_info = {} if metadata: # B2 metadata keys must be strings and values must be strings for key, value in metadata.items(): file_info[str(key)] = str(value) self.bucket.upload_bytes( data_bytes=file_content, file_name=file_path, content_type=content_type or 'application/octet-stream', file_info=file_info ) print(f"✅ B2 upload successful: {file_path}") if metadata: print(f"📄 B2 metadata saved for: {file_path}") return True except Exception as e: print(f"❌ B2 upload failed for {file_path}: {e}") return False def download_file(self, file_path: str) -> bytes: """Download a file from B2 storage""" try: download_response = self.bucket.download_file_by_name(file_path) return download_response.response.content except Exception as e: print(f"❌ B2 download failed for {file_path}: {e}") raise FileNotFoundError(f"File not found: {file_path}") def delete_file(self, file_path: str) -> bool: """Delete a file from B2 storage""" try: file_info = self.bucket.get_file_info_by_name(file_path) self.bucket.delete_file_version(file_info.id_, file_info.file_name) print(f"✅ B2 file deleted: {file_path}") return True except Exception as e: print(f"❌ B2 delete failed for {file_path}: {e}") return False def file_exists(self, file_path: str) -> bool: """Check if a file exists in B2 storage""" try: self.bucket.get_file_info_by_name(file_path) return True except Exception: return False def get_file_size(self, file_path: str) -> Optional[int]: """Get file size in bytes""" try: file_info = self.bucket.get_file_info_by_name(file_path) return file_info.size except Exception: return None def list_files(self, prefix: str = "") -> list: """List files with optional prefix""" try: file_list = [] # Use the basic ls() method without parameters first for file_version, folder in self.bucket.ls(): if prefix == "" or file_version.file_name.startswith(prefix): file_list.append(file_version.file_name) return file_list except Exception as e: print(f"❌ B2 list failed: {e}") return [] def get_metadata(self, file_path: str) -> Optional[dict]: """Get file metadata from B2 storage""" try: # Get file info from B2 file_info = self.bucket.get_file_info_by_name(file_path) # B2 stores metadata in file_info attribute metadata = {} if hasattr(file_info, 'file_info') and file_info.file_info: metadata.update(file_info.file_info) return metadata if metadata else None except Exception as e: print(f"❌ B2 metadata read failed for {file_path}: {e}") return None class StorageManager: """Storage manager that handles different backends""" def __init__(self, config): self.config = config self.backend = self._initialize_backend() def _initialize_backend(self) -> StorageBackend: """Initialize the appropriate storage backend based on configuration""" storage_config = self.config.get('storage', {}) backend_type = storage_config.get('backend', 'b2').lower() if backend_type == 'local': storage_path = storage_config.get('local_path', 'storage') return LocalStorageBackend(storage_path) elif backend_type == 'b2': # Validate B2 configuration if not self.config.validate_b2_config(): raise ValueError("Invalid B2 configuration") b2_config = self.config.get_b2_config() return B2StorageBackend( key_id=b2_config['key_id'], key=b2_config['key'], bucket_name=b2_config['bucket_name'] ) else: raise ValueError(f"Unknown storage backend: {backend_type}") def upload_file(self, file_content: bytes, file_path: str, content_type: str = None, metadata: Optional[dict] = None) -> bool: """Upload a file using the configured backend with optional metadata""" return self.backend.upload_file(file_content, file_path, content_type, metadata) def download_file(self, file_path: str) -> bytes: """Download a file using the configured backend""" return self.backend.download_file(file_path) def delete_file(self, file_path: str) -> bool: """Delete a file using the configured backend""" return self.backend.delete_file(file_path) def file_exists(self, file_path: str) -> bool: """Check if a file exists using the configured backend""" return self.backend.file_exists(file_path) def get_file_size(self, file_path: str) -> Optional[int]: """Get file size using the configured backend""" return self.backend.get_file_size(file_path) def list_files(self, prefix: str = "") -> list: """List files using the configured backend""" return self.backend.list_files(prefix) def get_metadata(self, file_path: str) -> Optional[dict]: """Get file metadata using the configured backend""" return self.backend.get_metadata(file_path) def get_backend_type(self) -> str: """Get the current backend type""" storage_config = self.config.get('storage', {}) return storage_config.get('backend', 'b2').lower() def get_backend_info(self) -> dict: """Get information about the current backend""" backend_type = self.get_backend_type() info = {'type': backend_type} if backend_type == 'local': storage_config = self.config.get('storage', {}) info['path'] = storage_config.get('local_path', 'storage') elif backend_type == 'b2': b2_config = self.config.get_b2_config() info['bucket'] = b2_config.get('bucket_name', 'unknown') return info