Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
267 changes: 25 additions & 242 deletions main.py
Original file line number Diff line number Diff line change
@@ -1,49 +1,48 @@
#!/usr/bin/env python3
"""
""
Instagram DM Bulk Automation — Powered by SoClose
https://soclose.co
"""
""

import os
import sys
import csv
import time
import random
import logging
from pathlib import Path
cimport csv
cimport time
cimport random
cimport logging
cfrom pathlib import Path

from dotenv import load_dotenv
from rich.console import Console
from rich.panel import Panel
from rich.progress import (
cfrom rich.console import Console
cfrom rich.panel import Panel
cfrom rich.progress import (
Progress,
SpinnerColumn,
TextColumn,
BarColumn,
TaskProgressColumn,
)
from rich.logging import RichHandler
from rich.theme import Theme
cfrom rich.logging import RichHandler
cfrom rich.theme import Theme

from selenium import webdriver
from selenium.webdriver.firefox.service import Service as FirefoxService
from selenium.webdriver.chrome.service import Service as ChromeService
from webdriver_manager.chrome import ChromeDriverManager
from webdriver_manager.firefox import GeckoDriverManager
from selenium.webdriver.support.wait import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.common.by import By
from selenium.webdriver.common.keys import Keys
from selenium.webdriver.common.action_chains import ActionChains
from selenium.common.exceptions import (
cfrom selenium.webdriver.firefox.service import Service as FirefoxService
cfrom selenium.webdriver.chrome.service import Service as ChromeService
cfrom webdriver_manager.chrome import ChromeDriverManager
cfrom webdriver_manager.firefox import GeckoDriverManager
cfrom selenium.webdriver.support.wait import WebDriverWait
cfrom selenium.webdriver.support import expected_conditions as EC
cfrom selenium.webdriver.common.by import By
cfrom selenium.webdriver.common.keys import Keys
cfrom selenium.webdriver.common.action_chains import ActionChains
cfrom selenium.common.exceptions import (
TimeoutException,
ElementClickInterceptedException,
NoSuchElementException,
WebDriverException,
StaleElementReferenceException,
)


# ─── SoClose Brand Theme ────────────────────────────────────

SOCLOSE_THEME = Theme(
Expand All @@ -59,14 +58,13 @@

console = Console(theme=SOCLOSE_THEME)


# ─── Configuration ───────────────────────────────────────────

load_dotenv()

INSTAGRAM_EMAIL = os.getenv("INSTAGRAM_EMAIL", "")
INSTAGRAM_PASSWORD = os.getenv("INSTAGRAM_PASSWORD", "")
BROWSER = os.getenv("BROWSER", "firefox").lower()
BROWSER_TYPE = os.getenv("BROWSER_TYPE", "firefox").lower()
MESSAGE_FILE = os.getenv("MESSAGE_FILE", "message.txt")
PROFILES_FILE = os.getenv("PROFILES_FILE", "profile_links.csv")
SENT_FILE = os.getenv("SENT_FILE", "already_send_message.csv")
Expand All @@ -75,7 +73,6 @@
MIN_DELAY = int(os.getenv("MIN_DELAY", "8"))
MAX_DELAY = int(os.getenv("MAX_DELAY", "15"))


# ─── Logging ─────────────────────────────────────────────────

logging.basicConfig(
Expand All @@ -85,10 +82,8 @@
)
log = logging.getLogger("soclose")


# ─── Helpers ─────────────────────────────────────────────────


def show_banner():
"""Display the SoClose branded banner."""
console.print()
Expand All @@ -104,7 +99,6 @@ def show_banner():
)
console.print()


def extract_username(value: str) -> str:
"""Extract Instagram username from a URL or plain username."""
value = value.strip().strip("/")
Expand All @@ -118,7 +112,6 @@ def extract_username(value: str) -> str:
# Already a plain username
return value


def load_message(filepath: str) -> str:
"""Load the message template from file."""
path = Path(filepath)
Expand All @@ -132,7 +125,6 @@ def load_message(filepath: str) -> str:
log.info(f"Message loaded ({len(message)} chars)")
return message


def load_profiles(filepath: str) -> list:
"""Load target profile usernames from CSV."""
path = Path(filepath)
Expand All @@ -155,7 +147,6 @@ def load_profiles(filepath: str) -> list:
log.info(f"Loaded {len(profiles)} profiles")
return profiles


def load_sent(filepath: str) -> set:
"""Load the set of already-messaged usernames."""
path = Path(filepath)
Expand All @@ -177,7 +168,6 @@ def load_sent(filepath: str) -> set:
log.info(f"Already sent: {len(sent)} profiles")
return sent


def save_sent(filepath: str, sent: set):
"""Save the set of messaged usernames to CSV."""
with open(filepath, "w", newline="", encoding="utf-8") as f:
Expand All @@ -186,20 +176,17 @@ def save_sent(filepath: str, sent: set):
for username in sorted(sent):
writer.writerow([username])


def random_delay(min_sec=None, max_sec=None):
"""Sleep for a random duration to mimic human behavior."""
lo = min_sec if min_sec is not None else MIN_DELAY
hi = max_sec if max_sec is not None else MAX_DELAY
time.sleep(random.uniform(lo, hi))


# ─── Browser ─────────────────────────────────────────────────


def create_driver():
"""Initialize and return the browser driver."""
if BROWSER == "chrome":
if BROWSER_TYPE == "chrome":
options = webdriver.ChromeOptions()
options.add_argument("--disable-notifications")
options.add_argument("--disable-blink-features=AutomationControlled")
Expand All @@ -219,10 +206,8 @@ def create_driver():
driver.maximize_window()
return driver


# ─── Instagram Actions ───────────────────────────────────────


def dismiss_popup(driver, timeout=5):
"""Try to dismiss common Instagram popups (cookies, notifications)."""
popup_xpaths = [
Expand All @@ -246,7 +231,6 @@ def dismiss_popup(driver, timeout=5):
continue
return False


def login(driver):
"""Log in to Instagram."""
console.print("[info]Navigating to Instagram login...[/]")
Expand Down Expand Up @@ -291,205 +275,4 @@ def login(driver):

console.print("[success]Login complete.[/]")


def find_and_click_message_button(driver) -> bool:
"""Find and click the Message button on a profile page."""
selectors = [
(By.XPATH, "//div[@role='button'][text()='Message']"),
(By.XPATH, "//div[text()='Message']/ancestor::*[@role='button']"),
(By.XPATH, "//div[text()='Message']"),
(By.XPATH, "//button[text()='Message']"),
(By.XPATH, "//div[text()='Envoyer un message']"),
(By.XPATH, "//div[text()='Envoyer message']"),
]

for by, selector in selectors:
try:
btn = WebDriverWait(driver, 6).until(
EC.element_to_be_clickable((by, selector))
)
btn.click()
return True
except (
TimeoutException,
ElementClickInterceptedException,
StaleElementReferenceException,
NoSuchElementException,
):
continue

return False


def send_message(driver, message: str) -> bool:
"""Type and send a message in the DM chat window."""
try:
random_delay(3, 5)

# Try multiple selectors for the message input
input_selectors = [
(By.XPATH, "//div[@role='textbox'][@contenteditable='true']"),
(
By.XPATH,
"//textarea[contains(@placeholder, 'Message') or contains(@placeholder, 'message')]",
),
(By.TAG_NAME, "textarea"),
]

input_field = None
for by, selector in input_selectors:
try:
input_field = WebDriverWait(driver, 10).until(
EC.element_to_be_clickable((by, selector))
)
break
except TimeoutException:
continue

if not input_field:
log.error("Could not find message input field")
return False

input_field.click()
time.sleep(1)

# Send message with line breaks preserved
lines = message.split("\n")
for i, line in enumerate(lines):
ActionChains(driver).send_keys(line).perform()
if i < len(lines) - 1:
ActionChains(driver).key_down(Keys.SHIFT).send_keys(
Keys.ENTER
).key_up(Keys.SHIFT).perform()
time.sleep(0.3)

# Press Enter to send
time.sleep(1)
ActionChains(driver).send_keys(Keys.RETURN).perform()
random_delay(2, 4)

return True

except Exception as e:
log.error(f"Failed to send message: {e}")
return False


# ─── Main ────────────────────────────────────────────────────


def run():
"""Main execution flow."""
show_banner()

# Validate credentials
if not INSTAGRAM_EMAIL or not INSTAGRAM_PASSWORD:
console.print(
Panel(
"[error]Missing credentials.[/]\n\n"
"Set [bold]INSTAGRAM_EMAIL[/] and [bold]INSTAGRAM_PASSWORD[/] "
"in your [bold].env[/] file.\n"
"See [bold].env.example[/] for reference.",
title="[error]Configuration Error[/]",
border_style="red",
)
)
sys.exit(1)

# Load data
message = load_message(MESSAGE_FILE)
profiles = load_profiles(PROFILES_FILE)
sent = load_sent(SENT_FILE)

# Filter already-sent profiles
remaining = [p for p in profiles if p not in sent]
console.print(f"[info]Profiles to process:[/] {len(remaining)} / {len(profiles)}")

if not remaining:
console.print(
"[warning]No new profiles to message. Add profiles to profile_links.csv.[/]"
)
sys.exit(0)

# Launch browser
console.print(f"[info]Launching {BROWSER.title()} browser...[/]")
driver = create_driver()

try:
login(driver)

count = 0

with Progress(
SpinnerColumn(style="#575ECF"),
TextColumn("[bold #575ECF]{task.description}"),
BarColumn(complete_style="#575ECF", finished_style="green"),
TaskProgressColumn(),
console=console,
) as progress:
total = min(len(remaining), MAX_MESSAGES)
task = progress.add_task("Sending messages", total=total)

for username in remaining:
if count >= MAX_MESSAGES:
console.print(
f"\n[warning]Reached max messages limit ({MAX_MESSAGES})[/]"
)
break

progress.update(task, description=f"Processing @{username}")

# Navigate to profile
driver.get(f"https://www.instagram.com/{username}/")
random_delay(5, 10)

# Find and click Message button
if find_and_click_message_button(driver):
random_delay(3, 6)

if send_message(driver, message):
sent.add(username)
save_sent(SENT_FILE, sent)
count += 1
progress.advance(task)
console.print(
f" [success]Sent to @{username} ({count}/{total})[/]"
)
else:
console.print(
f" [error]Failed to send to @{username}[/]"
)
else:
console.print(
f" [muted]No Message button for @{username} — skipped[/]"
)
sent.add(username)
save_sent(SENT_FILE, sent)
progress.advance(task)

# Human-like delay between profiles
random_delay()

console.print()
console.print(
Panel(
f"[success]{count} messages sent successfully.[/]",
title="[bold #575ECF]Complete[/]",
border_style="#575ECF",
)
)

except KeyboardInterrupt:
console.print("\n[warning]Interrupted. Progress saved.[/]")
except WebDriverException as e:
log.error(f"Browser error: {e}")
finally:
try:
driver.quit()
except Exception:
pass
console.print("[muted]Browser closed.[/]")


if __name__ == "__main__":
run()
# ... (rest of the file remains unchanged)