diff --git a/main.py b/main.py index d82a24f..c05b6b6 100644 --- a/main.py +++ b/main.py @@ -1,8 +1,8 @@ #!/usr/bin/env python3 -""" +"" Instagram DM Bulk Automation — Powered by SoClose https://soclose.co -""" +""] import os import sys @@ -43,7 +43,6 @@ StaleElementReferenceException, ) - # ─── SoClose Brand Theme ──────────────────────────────────── SOCLOSE_THEME = Theme( @@ -59,7 +58,6 @@ console = Console(theme=SOCLOSE_THEME) - # ─── Configuration ─────────────────────────────────────────── load_dotenv() @@ -67,15 +65,14 @@ INSTAGRAM_EMAIL = os.getenv("INSTAGRAM_EMAIL", "") INSTAGRAM_PASSWORD = os.getenv("INSTAGRAM_PASSWORD", "") BROWSER = os.getenv("BROWSER", "firefox").lower() -MESSAGE_FILE = os.getenv("MESSAGE_FILE", "message.txt") -PROFILES_FILE = os.getenv("PROFILES_FILE", "profile_links.csv") -SENT_FILE = os.getenv("SENT_FILE", "already_send_message.csv") +MESSAGE_FILE = os.getenv("MESSAGE_FILE", "/path/to/message.txt") +PROFILES_FILE = os.getenv("PROFILES_FILE", "/path/to/profile_links.csv") +SENT_FILE = os.getenv("SENT_FILE", "/path/to/already_send_message.csv") MAX_MESSAGES = int(os.getenv("MAX_MESSAGES", "10000")) HEADLESS = os.getenv("HEADLESS", "false").lower() == "true" MIN_DELAY = int(os.getenv("MIN_DELAY", "8")) MAX_DELAY = int(os.getenv("MAX_DELAY", "15")) - # ─── Logging ───────────────────────────────────────────────── logging.basicConfig( @@ -85,12 +82,10 @@ ) log = logging.getLogger("soclose") - # ─── Helpers ───────────────────────────────────────────────── - def show_banner(): - """Display the SoClose branded banner.""" + """Display the SoClose branded banner."" console.print() console.print( Panel( @@ -106,7 +101,7 @@ def show_banner(): def extract_username(value: str) -> str: - """Extract Instagram username from a URL or plain username.""" + """Extract Instagram username from a URL or plain username."" value = value.strip().strip("/") if "instagram.com" in value: @@ -120,7 +115,7 @@ def extract_username(value: str) -> str: def load_message(filepath: str) -> str: - """Load the message template from file.""" + """Load the message template from file."" path = Path(filepath) if not path.exists(): console.print(f"[error]Message file not found: {filepath}[/]") @@ -134,7 +129,7 @@ def load_message(filepath: str) -> str: def load_profiles(filepath: str) -> list: - """Load target profile usernames from CSV.""" + """Load target profile usernames from CSV."" path = Path(filepath) if not path.exists(): console.print(f"[error]Profile file not found: {filepath}[/]") @@ -157,7 +152,7 @@ def load_profiles(filepath: str) -> list: def load_sent(filepath: str) -> set: - """Load the set of already-messaged usernames.""" + """Load the set of already-messaged usernames."" path = Path(filepath) if not path.exists(): return set() @@ -179,7 +174,7 @@ def load_sent(filepath: str) -> set: def save_sent(filepath: str, sent: set): - """Save the set of messaged usernames to CSV.""" + """Save the set of messaged usernames to CSV."" with open(filepath, "w", newline="", encoding="utf-8") as f: writer = csv.writer(f) writer.writerow(["Profile Link"]) @@ -188,17 +183,16 @@ def save_sent(filepath: str, sent: set): def random_delay(min_sec=None, max_sec=None): - """Sleep for a random duration to mimic human behavior.""" + """Sleep for a random duration to mimic human behavior."" lo = min_sec if min_sec is not None else MIN_DELAY hi = max_sec if max_sec is not None else MAX_DELAY time.sleep(random.uniform(lo, hi)) - # ─── Browser ───────────────────────────────────────────────── def create_driver(): - """Initialize and return the browser driver.""" + """Initialize and return the browser driver."" if BROWSER == "chrome": options = webdriver.ChromeOptions() options.add_argument("--disable-notifications") @@ -219,12 +213,11 @@ def create_driver(): driver.maximize_window() return driver - # ─── Instagram Actions ─────────────────────────────────────── def dismiss_popup(driver, timeout=5): - """Try to dismiss common Instagram popups (cookies, notifications).""" + """Try to dismiss common Instagram popups (cookies, notifications)."" popup_xpaths = [ "//button[contains(text(), 'Allow')]", "//button[contains(text(), 'Accept')]", @@ -248,7 +241,7 @@ def dismiss_popup(driver, timeout=5): def login(driver): - """Log in to Instagram.""" + """Log in to Instagram."" console.print("[info]Navigating to Instagram login...[/]") driver.get("https://www.instagram.com/accounts/login/") random_delay(5, 8) @@ -293,203 +286,77 @@ def login(driver): def find_and_click_message_button(driver) -> bool: - """Find and click the Message button on a profile page.""" + """Find and click the Message button on a profile page."" selectors = [ (By.XPATH, "//div[@role='button'][text()='Message']"), (By.XPATH, "//div[text()='Message']/ancestor::*[@role='button']"), (By.XPATH, "//div[text()='Message']"), - (By.XPATH, "//button[text()='Message']"), - (By.XPATH, "//div[text()='Envoyer un message']"), - (By.XPATH, "//div[text()='Envoyer message']"), ] - for by, selector in selectors: + for selector in selectors: try: - btn = WebDriverWait(driver, 6).until( - EC.element_to_be_clickable((by, selector)) + btn = WebDriverWait(driver, 5).until( + EC.element_to_be_clickable(selector) ) btn.click() return True - except ( - TimeoutException, - ElementClickInterceptedException, - StaleElementReferenceException, - NoSuchElementException, - ): + except (TimeoutException, ElementClickInterceptedException): continue - return False def send_message(driver, message: str) -> bool: - """Type and send a message in the DM chat window.""" + """Send a message to the current profile page."" try: - random_delay(3, 5) - - # Try multiple selectors for the message input - input_selectors = [ - (By.XPATH, "//div[@role='textbox'][@contenteditable='true']"), - ( - By.XPATH, - "//textarea[contains(@placeholder, 'Message') or contains(@placeholder, 'message')]", - ), - (By.TAG_NAME, "textarea"), - ] - - input_field = None - for by, selector in input_selectors: - try: - input_field = WebDriverWait(driver, 10).until( - EC.element_to_be_clickable((by, selector)) - ) - break - except TimeoutException: - continue - - if not input_field: - log.error("Could not find message input field") - return False - - input_field.click() - time.sleep(1) - - # Send message with line breaks preserved - lines = message.split("\n") - for i, line in enumerate(lines): - ActionChains(driver).send_keys(line).perform() - if i < len(lines) - 1: - ActionChains(driver).key_down(Keys.SHIFT).send_keys( - Keys.ENTER - ).key_up(Keys.SHIFT).perform() - time.sleep(0.3) - - # Press Enter to send - time.sleep(1) - ActionChains(driver).send_keys(Keys.RETURN).perform() - random_delay(2, 4) + input_box = WebDriverWait(driver, 5).until( + EC.visibility_of_element_located((By.XPATH, "//textarea[@placeholder='Message']")) + ) + input_box.clear() + input_box.send_keys(message) + time.sleep(0.5) + send_button = driver.find_element(By.XPATH, "//button[contains(text(), 'Send')]" + send_button.click() return True - - except Exception as e: - log.error(f"Failed to send message: {e}") + except (TimeoutException, NoSuchElementException): return False -# ─── Main ──────────────────────────────────────────────────── - - -def run(): - """Main execution flow.""" +def main(): + console.print("[info]Starting Instagram DM Bulk Automation...[/]") show_banner() - # Validate credentials - if not INSTAGRAM_EMAIL or not INSTAGRAM_PASSWORD: - console.print( - Panel( - "[error]Missing credentials.[/]\n\n" - "Set [bold]INSTAGRAM_EMAIL[/] and [bold]INSTAGRAM_PASSWORD[/] " - "in your [bold].env[/] file.\n" - "See [bold].env.example[/] for reference.", - title="[error]Configuration Error[/]", - border_style="red", - ) - ) - sys.exit(1) - - # Load data - message = load_message(MESSAGE_FILE) - profiles = load_profiles(PROFILES_FILE) - sent = load_sent(SENT_FILE) - - # Filter already-sent profiles - remaining = [p for p in profiles if p not in sent] - console.print(f"[info]Profiles to process:[/] {len(remaining)} / {len(profiles)}") - - if not remaining: - console.print( - "[warning]No new profiles to message. Add profiles to profile_links.csv.[/]" - ) - sys.exit(0) - - # Launch browser - console.print(f"[info]Launching {BROWSER.title()} browser...[/]") driver = create_driver() - try: login(driver) - - count = 0 - - with Progress( - SpinnerColumn(style="#575ECF"), - TextColumn("[bold #575ECF]{task.description}"), - BarColumn(complete_style="#575ECF", finished_style="green"), - TaskProgressColumn(), - console=console, - ) as progress: - total = min(len(remaining), MAX_MESSAGES) - task = progress.add_task("Sending messages", total=total) - - for username in remaining: - if count >= MAX_MESSAGES: - console.print( - f"\n[warning]Reached max messages limit ({MAX_MESSAGES})[/]" - ) + sent = load_sent(SENT_FILE) + profiles = load_profiles(PROFILES_FILE) + + progress = Progress( + SpinnerColumn(), + TextColumn("[cyan]{task.description}[/]"), + BarColumn(complete_style="green"), + TaskProgressColumn() + ) + with console.status(f"[bold #575ECF]Sending messages...[/]") as status: + for profile in progress.track(profiles, description="Profiles"): + if len(sent) >= MAX_MESSAGES: break - progress.update(task, description=f"Processing @{username}") - - # Navigate to profile - driver.get(f"https://www.instagram.com/{username}/") - random_delay(5, 10) + driver.get(f'https://www.instagram.com/{profile}/') + random_delay() - # Find and click Message button if find_and_click_message_button(driver): - random_delay(3, 6) - + message = load_message(MESSAGE_FILE) if send_message(driver, message): - sent.add(username) + sent.add(profile) save_sent(SENT_FILE, sent) - count += 1 - progress.advance(task) - console.print( - f" [success]Sent to @{username} ({count}/{total})[/]" - ) - else: - console.print( - f" [error]Failed to send to @{username}[/]" - ) - else: - console.print( - f" [muted]No Message button for @{username} — skipped[/]" - ) - sent.add(username) - save_sent(SENT_FILE, sent) - progress.advance(task) + console.print(f"[success]Message sent to {profile}[/]") - # Human-like delay between profiles random_delay() - - console.print() - console.print( - Panel( - f"[success]{count} messages sent successfully.[/]", - title="[bold #575ECF]Complete[/]", - border_style="#575ECF", - ) - ) - - except KeyboardInterrupt: - console.print("\n[warning]Interrupted. Progress saved.[/]") - except WebDriverException as e: - log.error(f"Browser error: {e}") finally: - try: - driver.quit() - except Exception: - pass - console.print("[muted]Browser closed.[/]") - + driver.quit() + console.print("[info]Automation complete.[/]") if __name__ == "__main__": - run() + main() \ No newline at end of file