Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
237 changes: 52 additions & 185 deletions main.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
#!/usr/bin/env python3
"""
""
Instagram DM Bulk Automation — Powered by SoClose
https://soclose.co
"""
""]

import os
import sys
Expand Down Expand Up @@ -43,7 +43,6 @@
StaleElementReferenceException,
)


# ─── SoClose Brand Theme ────────────────────────────────────

SOCLOSE_THEME = Theme(
Expand All @@ -59,23 +58,21 @@

console = Console(theme=SOCLOSE_THEME)


# ─── Configuration ───────────────────────────────────────────

load_dotenv()

INSTAGRAM_EMAIL = os.getenv("INSTAGRAM_EMAIL", "")
INSTAGRAM_PASSWORD = os.getenv("INSTAGRAM_PASSWORD", "")
BROWSER = os.getenv("BROWSER", "firefox").lower()
MESSAGE_FILE = os.getenv("MESSAGE_FILE", "message.txt")
PROFILES_FILE = os.getenv("PROFILES_FILE", "profile_links.csv")
SENT_FILE = os.getenv("SENT_FILE", "already_send_message.csv")
MESSAGE_FILE = os.getenv("MESSAGE_FILE", "/path/to/message.txt")
PROFILES_FILE = os.getenv("PROFILES_FILE", "/path/to/profile_links.csv")
SENT_FILE = os.getenv("SENT_FILE", "/path/to/already_send_message.csv")
MAX_MESSAGES = int(os.getenv("MAX_MESSAGES", "10000"))
HEADLESS = os.getenv("HEADLESS", "false").lower() == "true"
MIN_DELAY = int(os.getenv("MIN_DELAY", "8"))
MAX_DELAY = int(os.getenv("MAX_DELAY", "15"))


# ─── Logging ─────────────────────────────────────────────────

logging.basicConfig(
Expand All @@ -85,12 +82,10 @@
)
log = logging.getLogger("soclose")


# ─── Helpers ─────────────────────────────────────────────────


def show_banner():
"""Display the SoClose branded banner."""
"""Display the SoClose branded banner.""
console.print()
console.print(
Panel(
Expand All @@ -106,7 +101,7 @@ def show_banner():


def extract_username(value: str) -> str:
"""Extract Instagram username from a URL or plain username."""
"""Extract Instagram username from a URL or plain username.""
value = value.strip().strip("/")

if "instagram.com" in value:
Expand All @@ -120,7 +115,7 @@ def extract_username(value: str) -> str:


def load_message(filepath: str) -> str:
"""Load the message template from file."""
"""Load the message template from file.""
path = Path(filepath)
if not path.exists():
console.print(f"[error]Message file not found: {filepath}[/]")
Expand All @@ -134,7 +129,7 @@ def load_message(filepath: str) -> str:


def load_profiles(filepath: str) -> list:
"""Load target profile usernames from CSV."""
"""Load target profile usernames from CSV.""
path = Path(filepath)
if not path.exists():
console.print(f"[error]Profile file not found: {filepath}[/]")
Expand All @@ -157,7 +152,7 @@ def load_profiles(filepath: str) -> list:


def load_sent(filepath: str) -> set:
"""Load the set of already-messaged usernames."""
"""Load the set of already-messaged usernames.""
path = Path(filepath)
if not path.exists():
return set()
Expand All @@ -179,7 +174,7 @@ def load_sent(filepath: str) -> set:


def save_sent(filepath: str, sent: set):
"""Save the set of messaged usernames to CSV."""
"""Save the set of messaged usernames to CSV.""
with open(filepath, "w", newline="", encoding="utf-8") as f:
writer = csv.writer(f)
writer.writerow(["Profile Link"])
Expand All @@ -188,17 +183,16 @@ def save_sent(filepath: str, sent: set):


def random_delay(min_sec=None, max_sec=None):
"""Sleep for a random duration to mimic human behavior."""
"""Sleep for a random duration to mimic human behavior.""
lo = min_sec if min_sec is not None else MIN_DELAY
hi = max_sec if max_sec is not None else MAX_DELAY
time.sleep(random.uniform(lo, hi))


# ─── Browser ─────────────────────────────────────────────────


def create_driver():
"""Initialize and return the browser driver."""
"""Initialize and return the browser driver.""
if BROWSER == "chrome":
options = webdriver.ChromeOptions()
options.add_argument("--disable-notifications")
Expand All @@ -219,12 +213,11 @@ def create_driver():
driver.maximize_window()
return driver


# ─── Instagram Actions ───────────────────────────────────────


def dismiss_popup(driver, timeout=5):
"""Try to dismiss common Instagram popups (cookies, notifications)."""
"""Try to dismiss common Instagram popups (cookies, notifications).""
popup_xpaths = [
"//button[contains(text(), 'Allow')]",
"//button[contains(text(), 'Accept')]",
Expand All @@ -248,7 +241,7 @@ def dismiss_popup(driver, timeout=5):


def login(driver):
"""Log in to Instagram."""
"""Log in to Instagram.""
console.print("[info]Navigating to Instagram login...[/]")
driver.get("https://www.instagram.com/accounts/login/")
random_delay(5, 8)
Expand Down Expand Up @@ -293,203 +286,77 @@ def login(driver):


def find_and_click_message_button(driver) -> bool:
"""Find and click the Message button on a profile page."""
"""Find and click the Message button on a profile page.""
selectors = [
(By.XPATH, "//div[@role='button'][text()='Message']"),
(By.XPATH, "//div[text()='Message']/ancestor::*[@role='button']"),
(By.XPATH, "//div[text()='Message']"),
(By.XPATH, "//button[text()='Message']"),
(By.XPATH, "//div[text()='Envoyer un message']"),
(By.XPATH, "//div[text()='Envoyer message']"),
]

for by, selector in selectors:
for selector in selectors:
try:
btn = WebDriverWait(driver, 6).until(
EC.element_to_be_clickable((by, selector))
btn = WebDriverWait(driver, 5).until(
EC.element_to_be_clickable(selector)
)
btn.click()
return True
except (
TimeoutException,
ElementClickInterceptedException,
StaleElementReferenceException,
NoSuchElementException,
):
except (TimeoutException, ElementClickInterceptedException):
continue

return False


def send_message(driver, message: str) -> bool:
"""Type and send a message in the DM chat window."""
"""Send a message to the current profile page.""
try:
random_delay(3, 5)

# Try multiple selectors for the message input
input_selectors = [
(By.XPATH, "//div[@role='textbox'][@contenteditable='true']"),
(
By.XPATH,
"//textarea[contains(@placeholder, 'Message') or contains(@placeholder, 'message')]",
),
(By.TAG_NAME, "textarea"),
]

input_field = None
for by, selector in input_selectors:
try:
input_field = WebDriverWait(driver, 10).until(
EC.element_to_be_clickable((by, selector))
)
break
except TimeoutException:
continue

if not input_field:
log.error("Could not find message input field")
return False

input_field.click()
time.sleep(1)

# Send message with line breaks preserved
lines = message.split("\n")
for i, line in enumerate(lines):
ActionChains(driver).send_keys(line).perform()
if i < len(lines) - 1:
ActionChains(driver).key_down(Keys.SHIFT).send_keys(
Keys.ENTER
).key_up(Keys.SHIFT).perform()
time.sleep(0.3)

# Press Enter to send
time.sleep(1)
ActionChains(driver).send_keys(Keys.RETURN).perform()
random_delay(2, 4)
input_box = WebDriverWait(driver, 5).until(
EC.visibility_of_element_located((By.XPATH, "//textarea[@placeholder='Message']"))
)
input_box.clear()
input_box.send_keys(message)
time.sleep(0.5)

send_button = driver.find_element(By.XPATH, "//button[contains(text(), 'Send')]"
send_button.click()
return True

except Exception as e:
log.error(f"Failed to send message: {e}")
except (TimeoutException, NoSuchElementException):
return False


# ─── Main ────────────────────────────────────────────────────


def run():
"""Main execution flow."""
def main():
console.print("[info]Starting Instagram DM Bulk Automation...[/]")
show_banner()

# Validate credentials
if not INSTAGRAM_EMAIL or not INSTAGRAM_PASSWORD:
console.print(
Panel(
"[error]Missing credentials.[/]\n\n"
"Set [bold]INSTAGRAM_EMAIL[/] and [bold]INSTAGRAM_PASSWORD[/] "
"in your [bold].env[/] file.\n"
"See [bold].env.example[/] for reference.",
title="[error]Configuration Error[/]",
border_style="red",
)
)
sys.exit(1)

# Load data
message = load_message(MESSAGE_FILE)
profiles = load_profiles(PROFILES_FILE)
sent = load_sent(SENT_FILE)

# Filter already-sent profiles
remaining = [p for p in profiles if p not in sent]
console.print(f"[info]Profiles to process:[/] {len(remaining)} / {len(profiles)}")

if not remaining:
console.print(
"[warning]No new profiles to message. Add profiles to profile_links.csv.[/]"
)
sys.exit(0)

# Launch browser
console.print(f"[info]Launching {BROWSER.title()} browser...[/]")
driver = create_driver()

try:
login(driver)

count = 0

with Progress(
SpinnerColumn(style="#575ECF"),
TextColumn("[bold #575ECF]{task.description}"),
BarColumn(complete_style="#575ECF", finished_style="green"),
TaskProgressColumn(),
console=console,
) as progress:
total = min(len(remaining), MAX_MESSAGES)
task = progress.add_task("Sending messages", total=total)

for username in remaining:
if count >= MAX_MESSAGES:
console.print(
f"\n[warning]Reached max messages limit ({MAX_MESSAGES})[/]"
)
sent = load_sent(SENT_FILE)
profiles = load_profiles(PROFILES_FILE)

progress = Progress(
SpinnerColumn(),
TextColumn("[cyan]{task.description}[/]"),
BarColumn(complete_style="green"),
TaskProgressColumn()
)
with console.status(f"[bold #575ECF]Sending messages...[/]") as status:
for profile in progress.track(profiles, description="Profiles"):
if len(sent) >= MAX_MESSAGES:
break

progress.update(task, description=f"Processing @{username}")

# Navigate to profile
driver.get(f"https://www.instagram.com/{username}/")
random_delay(5, 10)
driver.get(f'https://www.instagram.com/{profile}/')
random_delay()

# Find and click Message button
if find_and_click_message_button(driver):
random_delay(3, 6)

message = load_message(MESSAGE_FILE)
if send_message(driver, message):
sent.add(username)
sent.add(profile)
save_sent(SENT_FILE, sent)
count += 1
progress.advance(task)
console.print(
f" [success]Sent to @{username} ({count}/{total})[/]"
)
else:
console.print(
f" [error]Failed to send to @{username}[/]"
)
else:
console.print(
f" [muted]No Message button for @{username} — skipped[/]"
)
sent.add(username)
save_sent(SENT_FILE, sent)
progress.advance(task)
console.print(f"[success]Message sent to {profile}[/]")

# Human-like delay between profiles
random_delay()

console.print()
console.print(
Panel(
f"[success]{count} messages sent successfully.[/]",
title="[bold #575ECF]Complete[/]",
border_style="#575ECF",
)
)

except KeyboardInterrupt:
console.print("\n[warning]Interrupted. Progress saved.[/]")
except WebDriverException as e:
log.error(f"Browser error: {e}")
finally:
try:
driver.quit()
except Exception:
pass
console.print("[muted]Browser closed.[/]")

driver.quit()
console.print("[info]Automation complete.[/]")

if __name__ == "__main__":
run()
main()