Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
225 changes: 9 additions & 216 deletions main.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
#!/usr/bin/env python3
"""
"
Instagram DM Bulk Automation — Powered by SoClose
https://soclose.co
"""
"

import os
import sys
import csv
csv
import time
import random
import logging
Expand Down Expand Up @@ -43,7 +43,6 @@
StaleElementReferenceException,
)


# ─── SoClose Brand Theme ────────────────────────────────────

SOCLOSE_THEME = Theme(
Expand All @@ -59,7 +58,6 @@

console = Console(theme=SOCLOSE_THEME)


# ─── Configuration ───────────────────────────────────────────

load_dotenv()
Expand All @@ -75,7 +73,6 @@
MIN_DELAY = int(os.getenv("MIN_DELAY", "8"))
MAX_DELAY = int(os.getenv("MAX_DELAY", "15"))


# ─── Logging ─────────────────────────────────────────────────

logging.basicConfig(
Expand All @@ -85,10 +82,8 @@
)
log = logging.getLogger("soclose")


# ─── Helpers ─────────────────────────────────────────────────


def show_banner():
"""Display the SoClose branded banner."""
console.print()
Expand All @@ -104,7 +99,6 @@ def show_banner():
)
console.print()


def extract_username(value: str) -> str:
"""Extract Instagram username from a URL or plain username."""
value = value.strip().strip("/")
Expand All @@ -118,7 +112,6 @@ def extract_username(value: str) -> str:
# Already a plain username
return value


def load_message(filepath: str) -> str:
"""Load the message template from file."""
path = Path(filepath)
Expand All @@ -132,7 +125,6 @@ def load_message(filepath: str) -> str:
log.info(f"Message loaded ({len(message)} chars)")
return message


def load_profiles(filepath: str) -> list:
"""Load target profile usernames from CSV."""
path = Path(filepath)
Expand All @@ -155,7 +147,6 @@ def load_profiles(filepath: str) -> list:
log.info(f"Loaded {len(profiles)} profiles")
return profiles


def load_sent(filepath: str) -> set:
"""Load the set of already-messaged usernames."""
path = Path(filepath)
Expand All @@ -177,7 +168,6 @@ def load_sent(filepath: str) -> set:
log.info(f"Already sent: {len(sent)} profiles")
return sent


def save_sent(filepath: str, sent: set):
"""Save the set of messaged usernames to CSV."""
with open(filepath, "w", newline="", encoding="utf-8") as f:
Expand All @@ -186,17 +176,14 @@ def save_sent(filepath: str, sent: set):
for username in sorted(sent):
writer.writerow([username])


def random_delay(min_sec=None, max_sec=None):
"""Sleep for a random duration to mimic human behavior."""
lo = min_sec if min_sec is not None else MIN_DELAY
hi = max_sec if max_sec is not None else MAX_DELAY
time.sleep(random.uniform(lo, hi))


# ─── Browser ─────────────────────────────────────────────────


def create_driver():
"""Initialize and return the browser driver."""
if BROWSER == "chrome":
Expand All @@ -219,10 +206,8 @@ def create_driver():
driver.maximize_window()
return driver


# ─── Instagram Actions ───────────────────────────────────────


def dismiss_popup(driver, timeout=5):
"""Try to dismiss common Instagram popups (cookies, notifications)."""
popup_xpaths = [
Expand All @@ -246,9 +231,12 @@ def dismiss_popup(driver, timeout=5):
continue
return False


def login(driver):
"""Log in to Instagram."""
if not INSTAGRAM_EMAIL or not INSTAGRAM_PASSWORD:
console.print("[error]Instagram email and password are not set. Please check your environment variables.[/]", style="bold red")
sys.exit(1)

console.print("[info]Navigating to Instagram login...[/]")
driver.get("https://www.instagram.com/accounts/login/")
random_delay(5, 8)
Expand Down Expand Up @@ -279,8 +267,7 @@ def login(driver):

# Wait for user to handle 2FA or any manual verification
console.print(
"[warning]If 2FA or a challenge appears, complete it in the browser now.[/]"
)
"[warning]If 2FA or a challenge appears, complete it in the browser now.[/]")
input("\n Press ENTER once you are logged in and ready to continue... ")
console.print()

Expand All @@ -291,205 +278,11 @@ def login(driver):

console.print("[success]Login complete.[/]")


def find_and_click_message_button(driver) -> bool:
"""Find and click the Message button on a profile page."""
selectors = [
(By.XPATH, "//div[@role='button'][text()='Message']"),
(By.XPATH, "//div[text()='Message']/ancestor::*[@role='button']"),
(By.XPATH, "//div[text()='Message']"),
(By.XPATH, "//button[text()='Message']"),
(By.XPATH, "//div[text()='Envoyer un message']"),
(By.XPATH, "//div[text()='Envoyer message']"),
]

for by, selector in selectors:
try:
btn = WebDriverWait(driver, 6).until(
EC.element_to_be_clickable((by, selector))
)
btn.click()
return True
except (
TimeoutException,
ElementClickInterceptedException,
StaleElementReferenceException,
NoSuchElementException,
):
continue

return False


def send_message(driver, message: str) -> bool:
"""Type and send a message in the DM chat window."""
try:
random_delay(3, 5)

# Try multiple selectors for the message input
input_selectors = [
(By.XPATH, "//div[@role='textbox'][@contenteditable='true']"),
(
By.XPATH,
"//textarea[contains(@placeholder, 'Message') or contains(@placeholder, 'message')]",
),
(By.TAG_NAME, "textarea"),
]

input_field = None
for by, selector in input_selectors:
try:
input_field = WebDriverWait(driver, 10).until(
EC.element_to_be_clickable((by, selector))
)
break
except TimeoutException:
continue

if not input_field:
log.error("Could not find message input field")
return False

input_field.click()
time.sleep(1)

# Send message with line breaks preserved
lines = message.split("\n")
for i, line in enumerate(lines):
ActionChains(driver).send_keys(line).perform()
if i < len(lines) - 1:
ActionChains(driver).key_down(Keys.SHIFT).send_keys(
Keys.ENTER
).key_up(Keys.SHIFT).perform()
time.sleep(0.3)

# Press Enter to send
time.sleep(1)
ActionChains(driver).send_keys(Keys.RETURN).perform()
random_delay(2, 4)

return True

except Exception as e:
log.error(f"Failed to send message: {e}")
return False


# ─── Main ────────────────────────────────────────────────────


def run():
"""Main execution flow."""
show_banner()

# Validate credentials
if not INSTAGRAM_EMAIL or not INSTAGRAM_PASSWORD:
console.print(
Panel(
"[error]Missing credentials.[/]\n\n"
"Set [bold]INSTAGRAM_EMAIL[/] and [bold]INSTAGRAM_PASSWORD[/] "
"in your [bold].env[/] file.\n"
"See [bold].env.example[/] for reference.",
title="[error]Configuration Error[/]",
border_style="red",
)
)
sys.exit(1)

# Load data
message = load_message(MESSAGE_FILE)
profiles = load_profiles(PROFILES_FILE)
sent = load_sent(SENT_FILE)

# Filter already-sent profiles
remaining = [p for p in profiles if p not in sent]
console.print(f"[info]Profiles to process:[/] {len(remaining)} / {len(profiles)}")

if not remaining:
console.print(
"[warning]No new profiles to message. Add profiles to profile_links.csv.[/]"
)
sys.exit(0)

# Launch browser
console.print(f"[info]Launching {BROWSER.title()} browser...[/]")
driver = create_driver()

try:
login(driver)

count = 0

with Progress(
SpinnerColumn(style="#575ECF"),
TextColumn("[bold #575ECF]{task.description}"),
BarColumn(complete_style="#575ECF", finished_style="green"),
TaskProgressColumn(),
console=console,
) as progress:
total = min(len(remaining), MAX_MESSAGES)
task = progress.add_task("Sending messages", total=total)

for username in remaining:
if count >= MAX_MESSAGES:
console.print(
f"\n[warning]Reached max messages limit ({MAX_MESSAGES})[/]"
)
break

progress.update(task, description=f"Processing @{username}")

# Navigate to profile
driver.get(f"https://www.instagram.com/{username}/")
random_delay(5, 10)

# Find and click Message button
if find_and_click_message_button(driver):
random_delay(3, 6)

if send_message(driver, message):
sent.add(username)
save_sent(SENT_FILE, sent)
count += 1
progress.advance(task)
console.print(
f" [success]Sent to @{username} ({count}/{total})[/]"
)
else:
console.print(
f" [error]Failed to send to @{username}[/]"
)
else:
console.print(
f" [muted]No Message button for @{username} — skipped[/]"
)
sent.add(username)
save_sent(SENT_FILE, sent)
progress.advance(task)

# Human-like delay between profiles
random_delay()

console.print()
console.print(
Panel(
f"[success]{count} messages sent successfully.[/]",
title="[bold #575ECF]Complete[/]",
border_style="#575ECF",
)
)

except KeyboardInterrupt:
console.print("\n[warning]Interrupted. Progress saved.[/]")
except WebDriverException as e:
log.error(f"Browser error: {e}")
finally:
try:
driver.quit()
except Exception:
pass
console.print("[muted]Browser closed.[/]")


if __name__ == "__main__":
run()
... (truncated, 196 more lines)