diff --git a/README.md b/README.md index 123cf23..c8cfb3e 100644 --- a/README.md +++ b/README.md @@ -23,12 +23,9 @@ # YouTube Viewer Simple program to increase YouTube views written in Python. Works with live stream too. -# NOTICE -**Not working anymore. If I can fix it there will be a new release. Until then don't use it.** - **Disclaimer:** This has been developed for educational purposes only. Any action you take using this script is strictly at your own risk. I will not be liable for any losses or damages you face using this script. -**Cons:** There will be some view drop always. +**Cons:** There will ALWAYS be view drops. # Support Consider a donation to keep this project alive and for the countless hours of works and testing :) @@ -66,7 +63,7 @@ Simple program to increase YouTube views written in Python. Works with live stre # Requirements - * **Python 3.7.x-3.11.x** + * **Python 3.7.x-3.12.x** * High speed Internet Connection * Good proxy list (http, https, socks4, socks5) * Google Chrome installed on your OS (not Chromium) @@ -75,29 +72,34 @@ Simple program to increase YouTube views written in Python. Works with live stre # Features * YouTube default, live streaming and YouTube Music support * Multithreaded and Dynamic thread support - * Auto download updated chrome driver whenever user's Google Chrome version is updated - * Patch chrome driver on the start of every thread by undetected-chromedriver + * Auto download updated chrome driver using undetected-chromedriver * Proxy support - * location : text file (must be on path) / proxy API (should work with most of the proxy providers) + * location : text file (must be on path) / proxy API (works with most proxy providers) * type : http, https, socks4, socks5 * format : `ip:port`, `user:pass@ip:port`, `ip:port:user:pass` - * proxy refresh after a certain time specified by the user + * proxy refresh after a specified time interval * rotating proxy support - * chrome v80+ randomized user agent based on platform - * canvas,audio,font,webgl fingerprint defender and IP leak prevent by webrtc control - * geolocation, timezone, referer spoofing - * can add extra extensions in the `extension/custom_extension/` folder - * direct link or search *keyword* on YouTube then watch the video by matching exact video *title* - * modify **urls.txt, search.txt and config.json** on the fly without restarting program - * HTTP api on localhost and a database to store view count - * config.json to save settings - * bypass consent page and several other pop up - * save bandwidth by reducing video quality - * can set higher(100%) watch duration percentage to increase *Watch time*, change playback speed + * Modern Chrome compatibility with randomized user agent + * Basic browser fingerprint protection: + * WebRTC leak prevention + * Timezone and referer spoofing + * Custom extension support via `extension/custom_extension/` folder + * Flexible video targeting: + * Direct video URLs + * Search-based targeting with title matching + * Real-time config updates without restart + * Monitoring and Management: + * HTTP API for status monitoring + * SQLite database for view tracking + * Configurable settings via config.json + * YouTube Interaction Features: + * Consent popup handling + * Bandwidth optimization via quality control + * Configurable watch duration and playback speed * #### Traffic Sources * YouTube Search * Suggested Videos - * External (Google, Yahoo, DuckDuckGo, Bing, Twitter) + * External Sources (Google, DuckDuckGo, Bing) * End Screens * Channel Pages * Direct or unknown @@ -174,7 +176,7 @@ Simple program to increase YouTube views written in Python. Works with live stre * ## Installation - First, make sure you have installed git and Python version between 3.7.x to 3.11.x + First, make sure you have installed git and Python version between 3.7.x to 3.12.x Open command prompt and type ``` @@ -187,9 +189,6 @@ Simple program to increase YouTube views written in Python. Works with live stre python -m pip install --upgrade pip wheel ``` ``` - pip install "setuptools<59" - ``` - ``` pip install -r requirements.txt ``` @@ -262,4 +261,10 @@ Simple program to increase YouTube views written in Python. Works with live stre # Credits - I want to thank all of you who have opened an issue or shared your code snippets or ideas with me! +I want to thank all of you who have opened an issue or shared your code snippets or ideas with me! + +Currently maintained by: +- Original author: MShawon +- Current maintainer: Leaske + +Feel free to contribute by opening issues or submitting pull requests! diff --git a/extension/proxy_auth_10/background.js b/extension/proxy_auth_10/background.js new file mode 100644 index 0000000..bd95c89 --- /dev/null +++ b/extension/proxy_auth_10/background.js @@ -0,0 +1,26 @@ + +var config = { + mode: "fixed_servers", + rules: { + singleProxy: { + scheme: "http", + host: "38.154.227.167", + port: parseInt(5868) + }, + bypassList: ["localhost"] + } + }; +chrome.proxy.settings.set({value: config, scope: "regular"}, function() {}); +function callbackFn(details) { + return { + authCredentials: { + username: "gdfmbipr", + password: "b87b7t83bxoa" + } + }; +} +chrome.webRequest.onAuthRequired.addListener( + callbackFn, + {urls: [""]}, + ['blocking'] +); diff --git a/extension/proxy_auth_10/manifest.json b/extension/proxy_auth_10/manifest.json new file mode 100644 index 0000000..a8e908b --- /dev/null +++ b/extension/proxy_auth_10/manifest.json @@ -0,0 +1,20 @@ + +{ + "version": "1.0.0", + "manifest_version": 2, + "name": "Chrome Proxy", + "permissions": [ + "proxy", + "tabs", + "unlimitedStorage", + "storage", + "", + "webRequest", + "webRequestBlocking" + ], + "background": { + "scripts": ["background.js"] + }, + "minimum_chrome_version":"22.0.0" +} + \ No newline at end of file diff --git a/extension/proxy_auth_2/background.js b/extension/proxy_auth_2/background.js new file mode 100644 index 0000000..063f421 --- /dev/null +++ b/extension/proxy_auth_2/background.js @@ -0,0 +1,26 @@ + +var config = { + mode: "fixed_servers", + rules: { + singleProxy: { + scheme: "http", + host: "188.74.210.207", + port: parseInt(6286|http) + }, + bypassList: ["localhost"] + } + }; +chrome.proxy.settings.set({value: config, scope: "regular"}, function() {}); +function callbackFn(details) { + return { + authCredentials: { + username: "gdfmbipr", + password: "b87b7t83bxoa" + } + }; +} +chrome.webRequest.onAuthRequired.addListener( + callbackFn, + {urls: [""]}, + ['blocking'] +); diff --git a/extension/proxy_auth_2/manifest.json b/extension/proxy_auth_2/manifest.json new file mode 100644 index 0000000..a8e908b --- /dev/null +++ b/extension/proxy_auth_2/manifest.json @@ -0,0 +1,20 @@ + +{ + "version": "1.0.0", + "manifest_version": 2, + "name": "Chrome Proxy", + "permissions": [ + "proxy", + "tabs", + "unlimitedStorage", + "storage", + "", + "webRequest", + "webRequestBlocking" + ], + "background": { + "scripts": ["background.js"] + }, + "minimum_chrome_version":"22.0.0" +} + \ No newline at end of file diff --git a/extension/proxy_auth_4/background.js b/extension/proxy_auth_4/background.js new file mode 100644 index 0000000..90306ac --- /dev/null +++ b/extension/proxy_auth_4/background.js @@ -0,0 +1,26 @@ + +var config = { + mode: "fixed_servers", + rules: { + singleProxy: { + scheme: "http", + host: "185.199.228.220", + port: parseInt(7300|http) + }, + bypassList: ["localhost"] + } + }; +chrome.proxy.settings.set({value: config, scope: "regular"}, function() {}); +function callbackFn(details) { + return { + authCredentials: { + username: "gdfmbipr", + password: "b87b7t83bxoa" + } + }; +} +chrome.webRequest.onAuthRequired.addListener( + callbackFn, + {urls: [""]}, + ['blocking'] +); diff --git a/extension/proxy_auth_4/manifest.json b/extension/proxy_auth_4/manifest.json new file mode 100644 index 0000000..a8e908b --- /dev/null +++ b/extension/proxy_auth_4/manifest.json @@ -0,0 +1,20 @@ + +{ + "version": "1.0.0", + "manifest_version": 2, + "name": "Chrome Proxy", + "permissions": [ + "proxy", + "tabs", + "unlimitedStorage", + "storage", + "", + "webRequest", + "webRequestBlocking" + ], + "background": { + "scripts": ["background.js"] + }, + "minimum_chrome_version":"22.0.0" +} + \ No newline at end of file diff --git a/extension/proxy_auth_5/background.js b/extension/proxy_auth_5/background.js new file mode 100644 index 0000000..5151beb --- /dev/null +++ b/extension/proxy_auth_5/background.js @@ -0,0 +1,26 @@ + +var config = { + mode: "fixed_servers", + rules: { + singleProxy: { + scheme: "http", + host: "188.74.210.21", + port: parseInt(6100|http) + }, + bypassList: ["localhost"] + } + }; +chrome.proxy.settings.set({value: config, scope: "regular"}, function() {}); +function callbackFn(details) { + return { + authCredentials: { + username: "gdfmbipr", + password: "b87b7t83bxoa" + } + }; +} +chrome.webRequest.onAuthRequired.addListener( + callbackFn, + {urls: [""]}, + ['blocking'] +); diff --git a/extension/proxy_auth_5/manifest.json b/extension/proxy_auth_5/manifest.json new file mode 100644 index 0000000..a8e908b --- /dev/null +++ b/extension/proxy_auth_5/manifest.json @@ -0,0 +1,20 @@ + +{ + "version": "1.0.0", + "manifest_version": 2, + "name": "Chrome Proxy", + "permissions": [ + "proxy", + "tabs", + "unlimitedStorage", + "storage", + "", + "webRequest", + "webRequestBlocking" + ], + "background": { + "scripts": ["background.js"] + }, + "minimum_chrome_version":"22.0.0" +} + \ No newline at end of file diff --git a/extension/proxy_auth_6/background.js b/extension/proxy_auth_6/background.js new file mode 100644 index 0000000..4472f4c --- /dev/null +++ b/extension/proxy_auth_6/background.js @@ -0,0 +1,26 @@ + +var config = { + mode: "fixed_servers", + rules: { + singleProxy: { + scheme: "http", + host: "188.74.183.10", + port: parseInt(8279) + }, + bypassList: ["localhost"] + } + }; +chrome.proxy.settings.set({value: config, scope: "regular"}, function() {}); +function callbackFn(details) { + return { + authCredentials: { + username: "gdfmbipr", + password: "b87b7t83bxoa" + } + }; +} +chrome.webRequest.onAuthRequired.addListener( + callbackFn, + {urls: [""]}, + ['blocking'] +); diff --git a/extension/proxy_auth_6/manifest.json b/extension/proxy_auth_6/manifest.json new file mode 100644 index 0000000..a8e908b --- /dev/null +++ b/extension/proxy_auth_6/manifest.json @@ -0,0 +1,20 @@ + +{ + "version": "1.0.0", + "manifest_version": 2, + "name": "Chrome Proxy", + "permissions": [ + "proxy", + "tabs", + "unlimitedStorage", + "storage", + "", + "webRequest", + "webRequestBlocking" + ], + "background": { + "scripts": ["background.js"] + }, + "minimum_chrome_version":"22.0.0" +} + \ No newline at end of file diff --git a/extension/proxy_auth_7/background.js b/extension/proxy_auth_7/background.js new file mode 100644 index 0000000..5e5281f --- /dev/null +++ b/extension/proxy_auth_7/background.js @@ -0,0 +1,26 @@ + +var config = { + mode: "fixed_servers", + rules: { + singleProxy: { + scheme: "http", + host: "154.95.36.199", + port: parseInt(6893) + }, + bypassList: ["localhost"] + } + }; +chrome.proxy.settings.set({value: config, scope: "regular"}, function() {}); +function callbackFn(details) { + return { + authCredentials: { + username: "gdfmbipr", + password: "b87b7t83bxoa" + } + }; +} +chrome.webRequest.onAuthRequired.addListener( + callbackFn, + {urls: [""]}, + ['blocking'] +); diff --git a/extension/proxy_auth_7/manifest.json b/extension/proxy_auth_7/manifest.json new file mode 100644 index 0000000..a8e908b --- /dev/null +++ b/extension/proxy_auth_7/manifest.json @@ -0,0 +1,20 @@ + +{ + "version": "1.0.0", + "manifest_version": 2, + "name": "Chrome Proxy", + "permissions": [ + "proxy", + "tabs", + "unlimitedStorage", + "storage", + "", + "webRequest", + "webRequestBlocking" + ], + "background": { + "scripts": ["background.js"] + }, + "minimum_chrome_version":"22.0.0" +} + \ No newline at end of file diff --git a/extension/proxy_auth_8/background.js b/extension/proxy_auth_8/background.js new file mode 100644 index 0000000..a603128 --- /dev/null +++ b/extension/proxy_auth_8/background.js @@ -0,0 +1,26 @@ + +var config = { + mode: "fixed_servers", + rules: { + singleProxy: { + scheme: "http", + host: "45.94.47.66", + port: parseInt(8110) + }, + bypassList: ["localhost"] + } + }; +chrome.proxy.settings.set({value: config, scope: "regular"}, function() {}); +function callbackFn(details) { + return { + authCredentials: { + username: "gdfmbipr", + password: "b87b7t83bxoa" + } + }; +} +chrome.webRequest.onAuthRequired.addListener( + callbackFn, + {urls: [""]}, + ['blocking'] +); diff --git a/extension/proxy_auth_8/manifest.json b/extension/proxy_auth_8/manifest.json new file mode 100644 index 0000000..a8e908b --- /dev/null +++ b/extension/proxy_auth_8/manifest.json @@ -0,0 +1,20 @@ + +{ + "version": "1.0.0", + "manifest_version": 2, + "name": "Chrome Proxy", + "permissions": [ + "proxy", + "tabs", + "unlimitedStorage", + "storage", + "", + "webRequest", + "webRequestBlocking" + ], + "background": { + "scripts": ["background.js"] + }, + "minimum_chrome_version":"22.0.0" +} + \ No newline at end of file diff --git a/proxy_check.py b/proxy_check.py index aa4443b..807d506 100644 --- a/proxy_check.py +++ b/proxy_check.py @@ -21,6 +21,7 @@ OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. """ + import os import shutil import sys @@ -35,33 +36,41 @@ class bcolors: - HEADER = '\033[95m' - OKBLUE = '\033[94m' - OKCYAN = '\033[96m' - OKGREEN = '\033[92m' - WARNING = '\033[93m' - FAIL = '\033[91m' - ENDC = '\033[0m' - BOLD = '\033[1m' - UNDERLINE = '\033[4m' - - -print(bcolors.OKGREEN + """ + HEADER = "\033[95m" + OKBLUE = "\033[94m" + OKCYAN = "\033[96m" + OKGREEN = "\033[92m" + WARNING = "\033[93m" + FAIL = "\033[91m" + ENDC = "\033[0m" + BOLD = "\033[1m" + UNDERLINE = "\033[4m" + + +print( + bcolors.OKGREEN + + r""" ____ - | _ \ _ __ _____ ___ _ - | |_) | '__/ _ \ \/ / | | | - | __/| | | (_) > <| |_| | - |_| |_|_ \___/_/\_\\__, |_ - / ___| |__ ___|___/| | _____ _ __ + | _ \ _ __ _____ ___ _ + | |_) | '__/ _ \ \/ / | | | + | __/| | | (_) > <| |_| | + |_| |_|_ \___/_/\_\\__, |_ + / ___| |__ ___|___/| | _____ _ __ | | | '_ \ / _ \/ __| |/ / _ \ '__| | |___| | | | __/ (__| < __/ | \____|_| |_|\___|\___|_|\_\___|_| -""" + bcolors.ENDC) +""" + + bcolors.ENDC +) -print(bcolors.OKCYAN + """ +print( + bcolors.OKCYAN + + """ [ GitHub : https://github.com/MShawon/YouTube-Viewer ] -""" + bcolors.ENDC) +""" + + bcolors.ENDC +) checked = {} @@ -70,22 +79,25 @@ class bcolors: def backup(): try: - shutil.copy('GoodProxy.txt', 'ProxyBackup.txt') - print(bcolors.WARNING + - 'GoodProxy.txt backed up in ProxyBackup.txt' + bcolors.ENDC) + shutil.copy("GoodProxy.txt", "ProxyBackup.txt") + print( + bcolors.WARNING + + "GoodProxy.txt backed up in ProxyBackup.txt" + + bcolors.ENDC + ) except Exception: pass - print('', file=open('GoodProxy.txt', 'w')) + print("", file=open("GoodProxy.txt", "w")) def clean_exe_temp(folder): try: - temp_name = sys._MEIPASS.split('\\')[-1] + temp_name = sys._MEIPASS.split("\\")[-1] except Exception: temp_name = None - for f in glob(os.path.join('temp', folder, '*')): + for f in glob(os.path.join("temp", folder, "*")): if temp_name not in f: shutil.rmtree(f, ignore_errors=True) @@ -93,24 +105,23 @@ def clean_exe_temp(folder): def load_proxy(): proxies = [] - filename = input(bcolors.OKBLUE + - 'Enter your proxy file name: ' + bcolors.ENDC) + filename = input(bcolors.OKBLUE + "Enter your proxy file name: " + bcolors.ENDC) - if not os.path.isfile(filename) and filename[-4:] != '.txt': - filename = f'{filename}.txt' + if not os.path.isfile(filename) and filename[-4:] != ".txt": + filename = f"{filename}.txt" try: with open(filename, encoding="utf-8") as fh: - loaded = [x.strip() for x in fh if x.strip() != ''] + loaded = [x.strip() for x in fh if x.strip() != ""] except Exception as e: print(bcolors.FAIL + str(e) + bcolors.ENDC) - input('') + input("") sys.exit() for lines in loaded: - if lines.count(':') == 3: - split = lines.split(':') - lines = f'{split[2]}:{split[-1]}@{split[0]}:{split[1]}' + if lines.count(":") == 3: + split = lines.split(":") + lines = f"{split[2]}:{split[-1]}@{split[0]}:{split[1]}" proxies.append(lines) return proxies @@ -128,34 +139,43 @@ def main_checker(proxy_type, proxy, position): "https": f"{proxy_type}://{proxy}", } - header = Headers( - headers=False - ).generate() - agent = header['User-Agent'] + header = Headers(headers=False).generate() + agent = header["User-Agent"] headers = { - 'User-Agent': f'{agent}', + "User-Agent": f"{agent}", } response = requests.get( - 'https://www.youtube.com/', headers=headers, proxies=proxy_dict, timeout=30) + "https://www.youtube.com/", headers=headers, proxies=proxy_dict, timeout=30 + ) status = response.status_code if status != 200: raise Exception(status) - print(bcolors.OKBLUE + f"Worker {position+1} | " + bcolors.OKGREEN + - f'{proxy} | GOOD | Type : {proxy_type} | Response : {status}' + bcolors.ENDC) + print( + bcolors.OKBLUE + + f"Worker {position+1} | " + + bcolors.OKGREEN + + f"{proxy} | GOOD | Type : {proxy_type} | Response : {status}" + + bcolors.ENDC + ) - print(f'{proxy}|{proxy_type}', file=open('GoodProxy.txt', 'a')) + print(f"{proxy}|{proxy_type}", file=open("GoodProxy.txt", "a")) except Exception as e: try: e = int(e.args[0]) except Exception: - e = '' - print(bcolors.OKBLUE + f"Worker {position+1} | " + bcolors.FAIL + - f'{proxy} | {proxy_type} | BAD | {e}' + bcolors.ENDC) + e = "" + print( + bcolors.OKBLUE + + f"Worker {position+1} | " + + bcolors.FAIL + + f"{proxy} | {proxy_type} | BAD | {e}" + + bcolors.ENDC + ) checked[position] = proxy_type @@ -163,15 +183,15 @@ def proxy_check(position): sleep(2) proxy = proxy_list[position] - if '|' in proxy: - splitted = proxy.split('|') + if "|" in proxy: + splitted = proxy.split("|") main_checker(splitted[-1], splitted[0], position) else: - main_checker('http', proxy, position) - if checked[position] == 'http': - main_checker('socks4', proxy, position) - if checked[position] == 'socks4': - main_checker('socks5', proxy, position) + main_checker("http", proxy, position) + if checked[position] == "http": + main_checker("socks4", proxy, position) + if checked[position] == "socks4": + main_checker("socks5", proxy, position) def main(): @@ -181,33 +201,40 @@ def main(): pool_number = [i for i in range(total_proxies)] with ThreadPoolExecutor(max_workers=threads) as executor: - futures = [executor.submit(proxy_check, position) - for position in pool_number] + futures = [executor.submit(proxy_check, position) for position in pool_number] done, not_done = wait(futures, timeout=0) try: while not_done: freshly_done, not_done = wait(not_done, timeout=5) done |= freshly_done except KeyboardInterrupt: - print(bcolors.WARNING + - 'Hold on!!! Allow me a moment to finish the running threads' + bcolors.ENDC) + print( + bcolors.WARNING + + "Hold on!!! Allow me a moment to finish the running threads" + + bcolors.ENDC + ) cancel_all = True for future in not_done: _ = future.cancel() _ = wait(not_done, timeout=None) raise KeyboardInterrupt except IndexError: - print(bcolors.WARNING + 'Number of proxies are less than threads. Provide more proxies or less threads. ' + bcolors.ENDC) + print( + bcolors.WARNING + + "Number of proxies are less than threads. Provide more proxies or less threads. " + + bcolors.ENDC + ) -if __name__ == '__main__': +if __name__ == "__main__": - clean_exe_temp(folder='proxy_check') + clean_exe_temp(folder="proxy_check") backup() try: threads = int( - input(bcolors.OKBLUE+'Threads (recommended = 100): ' + bcolors.ENDC)) + input(bcolors.OKBLUE + "Threads (recommended = 100): " + bcolors.ENDC) + ) except Exception: threads = 100 @@ -216,8 +243,7 @@ def main(): proxy_list = list(set(filter(None, proxy_list))) total_proxies = len(proxy_list) - print(bcolors.OKCYAN + - f'Total unique proxies : {total_proxies}' + bcolors.ENDC) + print(bcolors.OKCYAN + f"Total unique proxies : {total_proxies}" + bcolors.ENDC) try: main() diff --git a/requirements.txt b/requirements.txt index 4364684..4dcde59 100644 Binary files a/requirements.txt and b/requirements.txt differ diff --git a/search.txt b/search.txt index 2b7bb35..bdc3286 100644 --- a/search.txt +++ b/search.txt @@ -1,6 +1 @@ -keyword 1 :::: exact video title 1 -keyword 2 :::: exact video title 2 -keyword 3 :::: exact video title 2 -keyword 1 :::: video_id 1 -keyword 2 :::: video_id 2 -keyword 4 :::: exact video title 3 \ No newline at end of file +Separate Ways :::: Daughtry - Separate Ways (Worlds Apart) (Official Music Video) ft. Lzzy Hale ft. Lzzy Hale \ No newline at end of file diff --git a/urls.txt b/urls.txt index f2a3d15..25d4fa7 100644 --- a/urls.txt +++ b/urls.txt @@ -1,3 +1 @@ -https://youtu.be/xxxxxxxxxxxx -https://www.youtube.com/watch?v=xxxxxxxxxxxx -https://t.co/xxxxxxxxxx \ No newline at end of file +https://www.youtube.com/watch?v=I9iXHAgjXhA \ No newline at end of file diff --git a/youtube_viewer.py b/youtube_viewer.py index edb2224..3471519 100644 --- a/youtube_viewer.py +++ b/youtube_viewer.py @@ -1,40 +1,17 @@ -""" -MIT License - -Copyright (c) 2021-2023 MShawon - -Permission is hereby granted, free of charge, to any person obtaining a copy -of this software and associated documentation files (the "Software"), to deal -in the Software without restriction, including without limitation the rights -to use, copy, modify, merge, publish, distribute, sublicense, and/or sell -copies of the Software, and to permit persons to whom the Software is -furnished to do so, subject to the following conditions: - -The above copyright notice and this permission notice shall be included in all -copies or substantial portions of the Software. - -THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR -IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, -FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE -AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER -LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, -OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE -SOFTWARE. -""" -import io import json -import logging import re import textwrap from concurrent.futures import ThreadPoolExecutor, wait -from time import gmtime, sleep, strftime, time +from time import time as get_time, sleep, strftime, gmtime +from datetime import datetime +import logging import psutil +import undetected_chromedriver as uc from fake_headers import Headers, browsers from faker import Faker from requests.exceptions import RequestException from tabulate import tabulate -from undetected_chromedriver.patcher import Patcher from youtubeviewer import website from youtubeviewer.basics import * @@ -44,12 +21,14 @@ from youtubeviewer.load_files import * from youtubeviewer.proxies import * -log = logging.getLogger('werkzeug') +log = logging.getLogger("werkzeug") log.disabled = True -SCRIPT_VERSION = '1.8.0' +SCRIPT_VERSION = "1.9.0" -print(bcolors.OKGREEN + """ +print( + bcolors.OKGREEN + + """ Yb dP dP"Yb 88 88 888888 88 88 88""Yb 888888 YbdP dP Yb 88 88 88 88 88 88__dP 88__ @@ -60,15 +39,27 @@ Yb dP 88 88__ Yb db dP 88__ 88__dP YbdP 88 88"" YbdPYbdP 88"" 88"Yb YP 88 888888 YP YP 888888 88 Yb -""" + bcolors.ENDC) +""" + + bcolors.ENDC +) -print(bcolors.OKCYAN + """ +print( + bcolors.OKCYAN + + """ [ GitHub : https://github.com/MShawon/YouTube-Viewer ] -""" + bcolors.ENDC) + [ Maintained by : Leaske ] +""" + + bcolors.ENDC +) -print(bcolors.WARNING + f""" +print( + bcolors.WARNING + + f""" +{'-'*26} Version: {SCRIPT_VERSION} {'-'*26}+ -""" + bcolors.ENDC) +""" + + bcolors.ENDC +) + proxy = None status = None @@ -99,99 +90,97 @@ fake = Faker() cwd = os.getcwd() -patched_drivers = os.path.join(cwd, 'patched_drivers') -config_path = os.path.join(cwd, 'config.json') -driver_identifier = os.path.join(cwd, 'patched_drivers', 'chromedriver') +patched_drivers = os.path.join(cwd, "patched_drivers") +config_path = os.path.join(cwd, "config.json") -DATABASE = os.path.join(cwd, 'database.db') -DATABASE_BACKUP = os.path.join(cwd, 'database_backup.db') +DATABASE = os.path.join(cwd, "database.db") +DATABASE_BACKUP = os.path.join(cwd, "database_backup.db") animation = ["⢿", "⣻", "⣽", "⣾", "⣷", "⣯", "⣟", "⡿"] -headers_1 = ['Worker', 'Video Title', 'Watch / Actual Duration'] -headers_2 = ['Index', 'Video Title', 'Views'] +headers_1 = ["Worker", "Video Title", "Watch / Actual Duration"] +headers_2 = ["Index", "Video Title", "Views"] width = 0 -viewports = ['2560,1440', '1920,1080', '1440,900', - '1536,864', '1366,768', '1280,1024', '1024,768'] - -referers = ['https://search.yahoo.com/', 'https://duckduckgo.com/', 'https://www.google.com/', - 'https://www.bing.com/', 'https://t.co/', ''] - -referers = choices(referers, k=len(referers)*3) +viewports = [ + "2560,1440", + "1920,1080", + "1440,900", + "1536,864", + "1366,768", + "1280,1024", + "1024,768", +] + +referers = [ + "https://search.yahoo.com/", + "https://duckduckgo.com/", + "https://www.google.com/", + "https://www.bing.com/", +] + +referers = choices(referers, k=len(referers) * 3) website.console = console website.database = DATABASE -def monkey_patch_exe(self): - linect = 0 - replacement = self.gen_random_cdc() - replacement = f" var key = '${replacement.decode()}_';\n".encode() - with io.open(self.executable_path, "r+b") as fh: - for line in iter(lambda: fh.readline(), b""): - if b"var key = " in line: - fh.seek(-len(line), 1) - fh.write(replacement) - linect += 1 - return linect - - -Patcher.patch_exe = monkey_patch_exe - - def timestamp(): global date_fmt date_fmt = datetime.now().strftime("%d-%b-%Y %H:%M:%S") - return bcolors.OKGREEN + f'[{date_fmt}] | ' + bcolors.OKCYAN + f'{cpu_usage} | ' + return bcolors.OKGREEN + f"[{date_fmt}] | " + bcolors.OKCYAN + f"{cpu_usage} | " def clean_exe_temp(folder): temp_name = None - if hasattr(sys, '_MEIPASS'): - temp_name = sys._MEIPASS.split('\\')[-1] + if hasattr(sys, "_MEIPASS"): + temp_name = sys._MEIPASS.split("\\")[-1] else: - if sys.version_info.minor < 7 or sys.version_info.minor > 11: - print( - f'Your current python version is not compatible : {sys.version}') - print(f'Install Python version between 3.7.x to 3.11.x to run this script') + if sys.version_info.major == 3 and ( + sys.version_info.minor < 7 or sys.version_info.minor > 13 + ): + print(f"Your current python version is not compatible: {sys.version}") + print(f"Install Python version between 3.7.x to 3.13.x to run this script") input("") sys.exit() - for f in glob(os.path.join('temp', folder, '*')): + for f in glob(os.path.join("temp", folder, "*")): if temp_name not in f: shutil.rmtree(f, ignore_errors=True) def update_chrome_version(): - link = 'https://gist.githubusercontent.com/MShawon/29e185038f22e6ac5eac822a1e422e9d/raw/versions.txt' + link = "https://gist.githubusercontent.com/MShawon/29e185038f22e6ac5eac822a1e422e9d/raw/versions.txt" output = requests.get(link, timeout=60).text - chrome_versions = output.split('\n') + chrome_versions = output.split("\n") browsers.chrome_ver = chrome_versions def check_update(): - api_url = 'https://api.github.com/repos/MShawon/YouTube-Viewer/releases/latest' + api_url = "https://api.github.com/repos/MShawon/YouTube-Viewer/releases/latest" try: response = requests.get(api_url, timeout=30) - RELEASE_VERSION = response.json()['tag_name'] + RELEASE_VERSION = response.json()["tag_name"] if RELEASE_VERSION > SCRIPT_VERSION: - print(bcolors.OKCYAN + '#'*100 + bcolors.ENDC) - print(bcolors.OKCYAN + 'Update Available!!! ' + - f'YouTube Viewer version {SCRIPT_VERSION} needs to update to {RELEASE_VERSION} version.' + bcolors.ENDC) + print(bcolors.OKCYAN + "#" * 100 + bcolors.ENDC) + print( + bcolors.OKCYAN + + "Update Available!!! " + + f"YouTube Viewer version {SCRIPT_VERSION} needs to update to {RELEASE_VERSION} version." + + bcolors.ENDC + ) try: - notes = response.json()['body'].split( - 'SHA256')[0].split('\r\n') + notes = response.json()["body"].split("SHA256")[0].split("\r\n") for note in notes: if note: print(bcolors.HEADER + note + bcolors.ENDC) except Exception: pass - print(bcolors.OKCYAN + '#'*100 + '\n' + bcolors.ENDC) + print(bcolors.OKCYAN + "#" * 100 + "\n" + bcolors.ENDC) except Exception: pass @@ -202,8 +191,12 @@ def create_html(text_dict): date = f' [{date_fmt}] | ' cpu = f' {cpu_usage} | ' - str_fmt = ''.join( - [f' {value} ' for key, value in text_dict.items()]) + str_fmt = "".join( + [ + f' {value} ' + for key, value in text_dict.items() + ] + ) html = date + cpu + str_fmt console.insert(0, html) @@ -230,10 +223,10 @@ def direct_or_search(position): try: method = 1 url = choice(urls) - if 'music.youtube.com' in url: - youtube = 'Music' + if "music.youtube.com" in url: + youtube = "Music" else: - youtube = 'Video' + youtube = "Video" except IndexError: raise Exception("Your urls.txt is empty!") @@ -244,12 +237,12 @@ def direct_or_search(position): keyword = query[0] video_title = query[1] url = "https://www.youtube.com" - youtube = 'Video' + youtube = "Video" except IndexError: try: - youtube = 'Music' + youtube = "Music" url = choice(urls) - if 'music.youtube.com' not in url: + if "music.youtube.com" not in url: raise Exception except Exception: raise Exception("Your search.txt is empty!") @@ -273,93 +266,114 @@ def features(driver): def update_view_count(position): view.append(position) view_count = len(view) - print(timestamp() + bcolors.OKCYAN + - f'Worker {position} | View added : {view_count}' + bcolors.ENDC) + print( + timestamp() + + bcolors.OKCYAN + + f"Worker {position} | View added : {view_count}" + + bcolors.ENDC + ) - create_html({"#29b2d3": f'Worker {position} | View added : {view_count}'}) + create_html({"#29b2d3": f"Worker {position} | View added : {view_count}"}) if database: try: - update_database( - database=DATABASE, threads=max_threads) + update_database(database=DATABASE, threads=max_threads) except Exception: pass def set_referer(position, url, method, driver): + output = None # Initialize output variable referer = choice(referers) if referer: - if method == 2 and 't.co/' in referer: + if method == 2 and "t.co/" in referer: driver.get(url) else: - if 'search.yahoo.com' in referer: - driver.get('https://duckduckgo.com/') + if "search.yahoo.com" in referer: + driver.get("https://duckduckgo.com/") driver.execute_script( - "window.history.pushState('page2', 'Title', arguments[0]);", referer) + "window.history.pushState('page2', 'Title', arguments[0]);", referer + ) else: driver.get(referer) - driver.execute_script( - "window.location.href = '{}';".format(url)) + driver.execute_script("window.location.href = '{}';".format(url)) - print(timestamp() + bcolors.OKBLUE + - f"Worker {position} | Referer used : {referer}" + bcolors.ENDC) + print( + timestamp() + + bcolors.OKBLUE + + f"Worker {position} | Referer used : {referer}" + + bcolors.ENDC + ) - create_html( - {"#3b8eea": f"Worker {position} | Referer used : {referer}"}) + create_html({"#3b8eea": f"Worker {position} | Referer used : {referer}"}) else: driver.get(url) + try: + output = driver.title.replace(" - YouTube", "") + except: + output = "Unknown Title" + + return output + def youtube_normal(method, keyword, video_title, driver, output): if method == 2: msg = search_video(driver, keyword, video_title) - if msg == 'failed': + if msg == "failed": raise Exception( - f"Can't find this [{video_title}] video with this keyword [{keyword}]") + f"Can't find this [{video_title}] video with this keyword [{keyword}]" + ) skip_initial_ad(driver, output, duration_dict) try: - WebDriverWait(driver, 10).until(EC.visibility_of_element_located( - (By.ID, 'movie_player'))) + WebDriverWait(driver, 10).until( + EC.visibility_of_element_located((By.ID, "movie_player")) + ) except WebDriverException: raise Exception( - "Slow internet speed or Stuck at reCAPTCHA! Can't load YouTube...") + "Slow internet speed or Stuck at reCAPTCHA! Can't load YouTube..." + ) features(driver) try: - view_stat = WebDriverWait(driver, 30).until( - EC.presence_of_element_located((By.CSS_SELECTOR, '#count span'))).text + view_stat = ( + WebDriverWait(driver, 30) + .until(EC.presence_of_element_located((By.CSS_SELECTOR, "#count span"))) + .text + ) if not view_stat: raise WebDriverException except WebDriverException: - view_stat = driver.find_element( - By.XPATH, '//*[@id="info"]/span[1]').text + view_stat = driver.find_element(By.XPATH, '//*[@id="info"]/span[1]').text return view_stat def youtube_music(driver): - if 'coming-soon' in driver.title or 'not available' in driver.title: - raise Exception( - "YouTube Music is not available in your area!") + if "coming-soon" in driver.title or "not available" in driver.title: + raise Exception("YouTube Music is not available in your area!") try: - WebDriverWait(driver, 10).until(EC.visibility_of_element_located( - (By.XPATH, '//*[@id="player-page"]'))) + WebDriverWait(driver, 10).until( + EC.visibility_of_element_located((By.XPATH, '//*[@id="player-page"]')) + ) except WebDriverException: raise Exception( - "Slow internet speed or Stuck at reCAPTCHA! Can't load YouTube...") + "Slow internet speed or Stuck at reCAPTCHA! Can't load YouTube..." + ) bypass_popup(driver) play_music(driver) output = driver.find_element( - By.XPATH, '//ytmusic-player-bar//yt-formatted-string').text - view_stat = 'music' + By.XPATH, "//ytmusic-player-bar//yt-formatted-string" + ).text + view_stat = "music" return view_stat, output @@ -368,18 +382,17 @@ def spoof_timezone_geolocation(proxy_type, proxy, driver): try: proxy_dict = { "http": f"{proxy_type}://{proxy}", - "https": f"{proxy_type}://{proxy}", + "https": f"{proxy_type}://{proxy}", } - resp = requests.get( - "http://ip-api.com/json", proxies=proxy_dict, timeout=30) + resp = requests.get("http://ip-api.com/json", proxies=proxy_dict, timeout=30) if resp.status_code == 200: location = resp.json() - tz_params = {'timezoneId': location['timezone']} + tz_params = {"timezoneId": location["timezone"]} latlng_params = { - "latitude": location['lat'], - "longitude": location['lon'], - "accuracy": randint(20, 100) + "latitude": location["lat"], + "longitude": location["lon"], + "accuracy": randint(20, 100), } info = f"ip-api.com | Lat : {location['lat']} | Lon : {location['lon']} | TZ: {location['timezone']}" else: @@ -387,19 +400,20 @@ def spoof_timezone_geolocation(proxy_type, proxy, driver): except RequestException: location = fake.location_on_land() - tz_params = {'timezoneId': location[-1]} + tz_params = {"timezoneId": location[-1]} latlng_params = { "latitude": location[0], "longitude": location[1], - "accuracy": randint(20, 100) + "accuracy": randint(20, 100), } - info = f"Random | Lat : {location[0]} | Lon : {location[1]} | TZ: {location[-1]}" + info = ( + f"Random | Lat : {location[0]} | Lon : {location[1]} | TZ: {location[-1]}" + ) try: - driver.execute_cdp_cmd('Emulation.setTimezoneOverride', tz_params) + driver.execute_cdp_cmd("Emulation.setTimezoneOverride", tz_params) - driver.execute_cdp_cmd( - "Emulation.setGeolocationOverride", latlng_params) + driver.execute_cdp_cmd("Emulation.setGeolocationOverride", latlng_params) except WebDriverException: pass @@ -417,67 +431,84 @@ def control_player(driver, output, position, proxy, youtube, collect_id=True): break video_len = driver.execute_script( - "return document.getElementById('movie_player').getDuration()") + "return document.getElementById('movie_player').getDuration()" + ) sleep(1) if video_len == 0: - raise Exception('Video player is not loading...') + raise Exception("Video player is not loading...") - actual_duration = strftime( - "%Hh:%Mm:%Ss", gmtime(video_len)).lstrip("0h:0m:") - video_len = video_len*uniform(minimum, maximum) + actual_duration = strftime("%Hh:%Mm:%Ss", gmtime(video_len)).lstrip("0h:0m:") + video_len = video_len * uniform(minimum, maximum) duration = strftime("%Hh:%Mm:%Ss", gmtime(video_len)).lstrip("0h:0m:") if len(output) == 11: output = driver.title[:-10] - summary[position] = [position, output, f'{duration} / {actual_duration}'] + summary[position] = [position, output, f"{duration} / {actual_duration}"] website.summary_table = tabulate( - summary.values(), headers=headers_1, numalign='center', stralign='center', tablefmt="html") - - print(timestamp() + bcolors.OKBLUE + f"Worker {position} | " + bcolors.OKGREEN + - f"{proxy} --> {youtube} Found : {output} | Watch Duration : {duration} " + bcolors.ENDC) - - create_html({"#3b8eea": f"Worker {position} | ", - "#23d18b": f"{proxy.split('@')[-1]} --> {youtube} Found : {output} | Watch Duration : {duration} "}) + summary.values(), + headers=headers_1, + numalign="center", + stralign="center", + tablefmt="html", + ) + + print( + timestamp() + + bcolors.OKBLUE + + f"Worker {position} | " + + bcolors.OKGREEN + + f"{proxy} --> {youtube} Found : {output} | Watch Duration : {duration} " + + bcolors.ENDC + ) + + create_html( + { + "#3b8eea": f"Worker {position} | ", + "#23d18b": f"{proxy.split('@')[-1]} --> {youtube} Found : {output} | Watch Duration : {duration} ", + } + ) - if youtube == 'Video' and collect_id: + if youtube == "Video" and collect_id: try: - video_id = re.search( - r"(?:v=|\/)([0-9A-Za-z_-]{11}).*", current_url).group(1) + video_id = re.search(r"(?:v=|\/)([0-9A-Za-z_-]{11}).*", current_url).group( + 1 + ) if video_id not in suggested and output in driver.title: suggested.append(video_id) except Exception: pass try: - current_channel = driver.find_element( - By.CSS_SELECTOR, '#upload-info a').text + current_channel = driver.find_element(By.CSS_SELECTOR, "#upload-info a").text except WebDriverException: - current_channel = 'Unknown' + current_channel = "Unknown" error = 0 - loop = int(video_len/4) + loop = int(video_len / 4) for _ in range(loop): sleep(5) current_time = driver.execute_script( - "return document.getElementById('movie_player').getCurrentTime()") + "return document.getElementById('movie_player').getCurrentTime()" + ) - if youtube == 'Video': + if youtube == "Video": play_video(driver) random_command(driver) - elif youtube == 'Music': + elif youtube == "Music": play_music(driver) current_state = driver.execute_script( - "return document.getElementById('movie_player').getPlayerState()") + "return document.getElementById('movie_player').getPlayerState()" + ) if current_state in [-1, 3]: error += 1 else: error = 0 if error == 10: - error_msg = f'Taking too long to play the video | Reason : buffering' + error_msg = f"Taking too long to play the video | Reason : buffering" if current_state == -1: error_msg = f"Failed to play the video | Possible Reason : {proxy.split('@')[-1]} not working anymore" raise Exception(error_msg) @@ -487,12 +518,23 @@ def control_player(driver, output, position, proxy, youtube, collect_id=True): summary.pop(position, None) website.summary_table = tabulate( - summary.values(), headers=headers_1, numalign='center', stralign='center', tablefmt="html") + summary.values(), + headers=headers_1, + numalign="center", + stralign="center", + tablefmt="html", + ) output = textwrap.fill(text=output, width=75, break_on_hyphens=False) video_statistics[output] = video_statistics.get(output, 0) + 1 - website.html_table = tabulate(video_statistics.items(), headers=headers_2, - showindex=True, numalign='center', stralign='center', tablefmt="html") + website.html_table = tabulate( + video_statistics.items(), + headers=headers_2, + showindex=True, + numalign="center", + stralign="center", + tablefmt="html", + ) return current_url, current_channel @@ -501,19 +543,30 @@ def youtube_live(proxy, position, driver, output): error = 0 while True: try: - view_stat = driver.find_element( - By.CSS_SELECTOR, '#count span').text + view_stat = driver.find_element(By.CSS_SELECTOR, "#count span").text if not view_stat: raise WebDriverException except WebDriverException: - view_stat = driver.find_element( - By.XPATH, '//*[@id="info"]/span[1]').text - if 'watching' in view_stat: - print(timestamp() + bcolors.OKBLUE + f"Worker {position} | " + bcolors.OKGREEN + - f"{proxy} | {output} | " + bcolors.OKCYAN + f"{view_stat} " + bcolors.ENDC) - - create_html({"#3b8eea": f"Worker {position} | ", - "#23d18b": f"{proxy.split('@')[-1]} | {output} | ", "#29b2d3": f"{view_stat} "}) + view_stat = driver.find_element(By.XPATH, '//*[@id="info"]/span[1]').text + if "watching" in view_stat: + print( + timestamp() + + bcolors.OKBLUE + + f"Worker {position} | " + + bcolors.OKGREEN + + f"{proxy} | {output} | " + + bcolors.OKCYAN + + f"{view_stat} " + + bcolors.ENDC + ) + + create_html( + { + "#3b8eea": f"Worker {position} | ", + "#23d18b": f"{proxy.split('@')[-1]} | {output} | ", + "#29b2d3": f"{view_stat} ", + } + ) else: error += 1 @@ -529,63 +582,97 @@ def youtube_live(proxy, position, driver, output): def music_and_video(proxy, position, youtube, driver, output, view_stat): - rand_choice = 1 - if len(suggested) > 1 and view_stat != 'music': - rand_choice = randint(1, 3) - - for i in range(rand_choice): - if i == 0: - current_url, current_channel = control_player( - driver, output, position, proxy, youtube, collect_id=True) - - update_view_count(position) - - else: - print(timestamp() + bcolors.OKBLUE + - f"Worker {position} | Suggested video loop : {i}" + bcolors.ENDC) + try: + current_url, current_channel = control_player( + driver, output, position, proxy, youtube, collect_id=True + ) - create_html( - {"#3b8eea": f"Worker {position} | Suggested video loop : {i}"}) + update_view_count(position) + rand_choice = 1 + if len(suggested) > 1 and view_stat != "music": try: - output = play_next_video(driver, suggested) - except WebDriverException as e: - raise Exception( - f"Error suggested | {type(e).__name__} | {e.args[0] if e.args else ''}") - - print(timestamp() + bcolors.OKBLUE + - f"Worker {position} | Found next suggested video : [{output}]" + bcolors.ENDC) - - create_html( - {"#3b8eea": f"Worker {position} | Found next suggested video : [{output}]"}) + rand_choice = randint(1, 3) - skip_initial_ad(driver, output, duration_dict) - - features(driver) + for i in range(1, rand_choice): + print( + timestamp() + + bcolors.OKBLUE + + f"Worker {position} | Suggested video loop : {i}" + + bcolors.ENDC + ) - current_url, current_channel = control_player( - driver, output, position, proxy, youtube, collect_id=False) + create_html( + {"#3b8eea": f"Worker {position} | Suggested video loop : {i}"} + ) + + try: + output = play_next_video(driver, suggested) + + print( + timestamp() + + bcolors.OKBLUE + + f"Worker {position} | Found next suggested video : [{output}]" + + bcolors.ENDC + ) + + create_html( + { + "#3b8eea": f"Worker {position} | Found next suggested video : [{output}]" + } + ) + + skip_initial_ad(driver, output, duration_dict) + features(driver) + current_url, current_channel = control_player( + driver, output, position, proxy, youtube, collect_id=False + ) + update_view_count(position) + + except Exception as e: + print( + timestamp() + + bcolors.FAIL + + f"Worker {position} | Error playing suggested video: {str(e)}" + + bcolors.ENDC + ) + break + + except Exception as e: + print( + timestamp() + + bcolors.FAIL + + f"Worker {position} | Suggested video error: {str(e)}" + + bcolors.ENDC + ) - update_view_count(position) + except Exception as e: + raise e return current_url, current_channel -def channel_or_endscreen(proxy, position, youtube, driver, view_stat, current_url, current_channel): +def channel_or_endscreen( + proxy, position, youtube, driver, view_stat, current_url, current_channel +): option = 1 - if view_stat != 'music' and driver.current_url == current_url: + if view_stat != "music" and driver.current_url == current_url: option = choices([1, 2, 3], cum_weights=(0.5, 0.75, 1.00), k=1)[0] if option == 2: try: - output, log, option = play_from_channel( - driver, current_channel) + output, log, option = play_from_channel(driver, current_channel) except WebDriverException as e: raise Exception( - f"Error channel | {type(e).__name__} | {e.args[0] if e.args else ''}") + f"Error channel | {type(e).__name__} | {e.args[0] if e.args else ''}" + ) - print(timestamp() + bcolors.OKBLUE + - f"Worker {position} | {log}" + bcolors.ENDC) + print( + timestamp() + + bcolors.OKBLUE + + f"Worker {position} | {log}" + + bcolors.ENDC + ) create_html({"#3b8eea": f"Worker {position} | {log}"}) @@ -594,13 +681,21 @@ def channel_or_endscreen(proxy, position, youtube, driver, view_stat, current_ur output = play_end_screen_video(driver) except WebDriverException as e: raise Exception( - f"Error end screen | {type(e).__name__} | {e.args[0] if e.args else ''}") + f"Error end screen | {type(e).__name__} | {e.args[0] if e.args else ''}" + ) - print(timestamp() + bcolors.OKBLUE + - f"Worker {position} | Video played from end screen : [{output}]" + bcolors.ENDC) + print( + timestamp() + + bcolors.OKBLUE + + f"Worker {position} | Video played from end screen : [{output}]" + + bcolors.ENDC + ) create_html( - {"#3b8eea": f"Worker {position} | Video played from end screen : [{output}]"}) + { + "#3b8eea": f"Worker {position} | Video played from end screen : [{output}]" + } + ) if option in [2, 3]: skip_initial_ad(driver, output, duration_dict) @@ -608,7 +703,8 @@ def channel_or_endscreen(proxy, position, youtube, driver, view_stat, current_ur features(driver) current_url, current_channel = control_player( - driver, output, position, proxy, youtube, collect_id=False) + driver, output, position, proxy, youtube, collect_id=False + ) if option in [2, 3, 4]: update_view_count(position) @@ -617,33 +713,52 @@ def channel_or_endscreen(proxy, position, youtube, driver, view_stat, current_ur def windows_kill_drivers(): for process in constructor.Win32_Process(["CommandLine", "ProcessId"]): try: - if 'UserAgentClientHint' in process.CommandLine: - print(f'Killing PID : {process.ProcessId}', end="\r") - subprocess.Popen(['taskkill', '/F', '/PID', f'{process.ProcessId}'], - stdout=subprocess.PIPE, stderr=subprocess.DEVNULL, stdin=subprocess.DEVNULL) + if "UserAgentClientHint" in process.CommandLine: + print(f"Killing PID : {process.ProcessId}", end="\r") + subprocess.Popen( + ["taskkill", "/F", "/PID", f"{process.ProcessId}"], + stdout=subprocess.PIPE, + stderr=subprocess.DEVNULL, + stdin=subprocess.DEVNULL, + ) except Exception: pass - print('\n') + print("\n") -def quit_driver(driver, data_dir): - if driver and driver in driver_dict: - driver.quit() - if data_dir in temp_folders: - temp_folders.remove(data_dir) +def quit_driver(driver): + try: + if driver and driver in driver_dict: + driver.quit() + except Exception as e: + print(f"Error while quitting driver: {e}") + finally: + proxy_folder = driver_dict.pop(driver, None) + if proxy_folder: + shutil.rmtree(proxy_folder, ignore_errors=True) + return 400 - proxy_folder = driver_dict.pop(driver, None) - if proxy_folder: - shutil.rmtree(proxy_folder, ignore_errors=True) - status = 400 - return status +def handle_cookie_consent(driver): + for _ in range(3): + try: + consent_button = WebDriverWait(driver, 5).until( + EC.element_to_be_clickable( + (By.XPATH, "//button[contains(text(), 'Accept all')]") + ) + ) + consent_button.click() + print("Cookie consent accepted.") + return + except WebDriverException: + print("No cookie consent popup found or clicking failed. Retrying...") + sleep(1) + print("Failed to handle cookie consent after multiple attempts.") def main_viewer(proxy_type, proxy, position): global width, viewports driver = None - data_dir = None if cancel_all: raise KeyboardInterrupt @@ -653,16 +768,12 @@ def main_viewer(proxy_type, proxy, position): checked[position] = None - header = Headers( - browser="chrome", - os=osname, - headers=False - ).generate() - agent = header['User-Agent'] + header = Headers(browser="chrome", os=osname, headers=False).generate() + agent = header["User-Agent"] url, method, youtube, keyword, video_title = direct_or_search(position) - if category == 'r' and proxy_api: + if category == "r" and proxy_api: for _ in range(20): proxy = choice(proxies_from_api) if proxy not in used_proxies: @@ -675,129 +786,274 @@ def main_viewer(proxy_type, proxy, position): raise RequestException(status) try: - print(timestamp() + bcolors.OKBLUE + f"Worker {position} | " + bcolors.OKGREEN + - f"{proxy} | {proxy_type.upper()} | Good Proxy | Opening a new driver..." + bcolors.ENDC) + print( + timestamp() + + bcolors.OKBLUE + + f"Worker {position} | " + + bcolors.OKBLUE + + f"{proxy} | {proxy_type.upper()} | Good Proxy | Opening a new driver..." + + bcolors.ENDC + ) - create_html({"#3b8eea": f"Worker {position} | ", - "#23d18b": f"{proxy.split('@')[-1]} | {proxy_type.upper()} | Good Proxy | Opening a new driver..."}) + create_html( + { + "#3b8eea": f"Worker {position} | {proxy.split('@')[-1]} | {proxy_type.upper()} | Good Proxy | Opening a new driver..." # Combined into single color + } + ) while proxy in bad_proxies: bad_proxies.remove(proxy) sleep(1) - patched_driver = os.path.join( - patched_drivers, f'chromedriver_{position%threads}{exe_name}') - - try: - Patcher(executable_path=patched_driver).patch_exe() - except Exception: - pass - - proxy_folder = os.path.join( - cwd, 'extension', f'proxy_auth_{position}') - - factor = int(threads/(0.1*threads + 1)) - sleep_time = int((str(position)[-1])) * factor - sleep(sleep_time) - if cancel_all: - raise KeyboardInterrupt - - driver = get_driver(background, viewports, agent, auth_required, - patched_driver, proxy, proxy_type, proxy_folder) - - driver_dict[driver] = proxy_folder - - data_dir = driver.capabilities['chrome']['userDataDir'] - temp_folders.append(data_dir) + options = uc.ChromeOptions() + options.add_argument(f"user-agent={agent}") + if background: + options.add_argument("--headless") + options.add_argument("--disable-gpu") + options.add_argument("--disable-extensions") + options.add_argument("--proxy-server='direct://'") + options.add_argument("--proxy-bypass-list=*") + options.add_argument("--start-maximized") + options.add_argument("--disable-dev-shm-usage") + options.add_argument("--no-sandbox") + options.add_argument("--disable-web-security") + options.add_argument("--allow-running-insecure-content") + options.add_argument("--disable-infobars") + options.add_argument("--lang=en") + + driver = uc.Chrome(options=options) + driver_dict[driver] = None sleep(2) info = spoof_timezone_geolocation(proxy_type, proxy, driver) - isdetected = driver.execute_script('return navigator.webdriver') + isdetected = driver.execute_script("return navigator.webdriver") - print(timestamp() + bcolors.OKBLUE + f"Worker {position} | " + bcolors.OKGREEN + - f"{proxy} | {proxy_type.upper()} | " + bcolors.OKCYAN + f"{info} | Detected? : {isdetected}" + bcolors.ENDC) + print( + timestamp() + + bcolors.OKBLUE + + f"Worker {position} | " + + bcolors.OKBLUE + + f"{proxy} | {proxy_type.upper()} | " + + bcolors.OKBLUE + + f"{info} | Detected? : {isdetected}" + + bcolors.ENDC + ) - create_html({"#3b8eea": f"Worker {position} | ", - "#23d18b": f"{proxy.split('@')[-1]} | {proxy_type.upper()} | ", "#29b2d3": f"{info} | Detected? : {isdetected}"}) + create_html( + { + "#3b8eea": f"Worker {position} | {proxy.split('@')[-1]} | {proxy_type.upper()} | {info} | Detected? : {isdetected}" + } + ) if width == 0: - width = driver.execute_script('return screen.width') - height = driver.execute_script('return screen.height') - print(f'Display resolution : {width}x{height}') + width = driver.execute_script("return screen.width") + height = driver.execute_script("return screen.height") + print(f"Display resolution : {width}x{height}") viewports = [i for i in viewports if int(i[:4]) <= width] - set_referer(position, url, method, driver) + output = set_referer(position, url, method, driver) + + driver.get(url) + + direct_watch = False + if ( + "Your YouTube history is off" in driver.page_source + or "Try searching" in driver.page_source + ): + print( + timestamp() + + bcolors.OKBLUE + + f"Worker {position} | " + + bcolors.OKBLUE + + f"{proxy} | {proxy_type.upper()} | " + + bcolors.OKBLUE + + "Detected stuck page..." + + bcolors.ENDC + ) + bypassed_url = bypass_stuck_page(driver, urls, position) + if bypassed_url: + direct_watch = True + url = bypassed_url + else: + print( + timestamp() + + bcolors.OKBLUE + + f"Worker {position} | " + + bcolors.OKBLUE + + f"{proxy} | {proxy_type.upper()} | Detected stuck page..." + + bcolors.ENDC + ) + raise Exception("Could not bypass stuck page") + sleep(2) + + if direct_watch: + video_title = driver.title.replace(" - YouTube", "") + print( + timestamp() + + bcolors.OKBLUE + + f"Worker {position} | " + + bcolors.OKBLUE + + f"{proxy} | {proxy_type.upper()} | " + + bcolors.OKBLUE + + f"--> Video Found : {video_title}" + + bcolors.ENDC + ) + output = video_title - if 'consent' in driver.current_url: - print(timestamp() + bcolors.OKBLUE + - f"Worker {position} | Bypassing consent..." + bcolors.ENDC) + skip_initial_ad(driver, output, duration_dict) - create_html( - {"#3b8eea": f"Worker {position} | Bypassing consent..."}) + features(driver) - bypass_consent(driver) + try: + view_stat = ( + WebDriverWait(driver, 30) + .until( + EC.presence_of_element_located( + (By.CSS_SELECTOR, "#count span") + ) + ) + .text + ) + if not view_stat: + raise WebDriverException + except WebDriverException: + view_stat = driver.find_element( + By.XPATH, '//*[@id="info"]/span[1]' + ).text + + if "watching" in view_stat: + youtube_live(proxy, position, driver, output) + else: + current_url, current_channel = music_and_video( + proxy, position, "Video", driver, output, view_stat + ) + + channel_or_endscreen( + proxy, + position, + "Video", + driver, + view_stat, + current_url, + current_channel, + ) + else: + if youtube == "Video": + view_stat = youtube_normal( + method, keyword, video_title, driver, output + ) + else: + view_stat, output = youtube_music(driver) + + if "watching" in view_stat: + youtube_live(proxy, position, driver, output) + else: + current_url, current_channel = music_and_video( + proxy, position, youtube, driver, output, view_stat + ) + + channel_or_endscreen( + proxy, + position, + youtube, + driver, + view_stat, + current_url, + current_channel, + ) if video_title: output = video_title else: output = driver.title[:-10] - if youtube == 'Video': - view_stat = youtube_normal( - method, keyword, video_title, driver, output) + if youtube == "Video": + view_stat = youtube_normal(method, keyword, video_title, driver, output) else: view_stat, output = youtube_music(driver) - if 'watching' in view_stat: + if "watching" in view_stat: youtube_live(proxy, position, driver, output) else: current_url, current_channel = music_and_video( - proxy, position, youtube, driver, output, view_stat) - - channel_or_endscreen(proxy, position, youtube, - driver, view_stat, current_url, current_channel) + proxy, position, youtube, driver, output, view_stat + ) + + channel_or_endscreen( + proxy, + position, + youtube, + driver, + view_stat, + current_url, + current_channel, + ) if randint(1, 2) == 1: try: - driver.find_element(By.ID, 'movie_player').send_keys('k') + driver.find_element(By.ID, "movie_player").send_keys("k") except WebDriverException: pass - status = quit_driver(driver=driver, data_dir=data_dir) + status = quit_driver(driver=driver) except Exception as e: - status = quit_driver(driver=driver, data_dir=data_dir) + status = quit_driver(driver=driver) - print(timestamp() + bcolors.FAIL + - f"Worker {position} | Line : {e.__traceback__.tb_lineno} | {type(e).__name__} | {e.args[0] if e.args else ''}" + bcolors.ENDC) + print( + timestamp() + + bcolors.FAIL + + f"Worker {position} | Line : {e.__traceback__.tb_lineno} | {type(e).__name__} | {e.args[0] if e.args else ''}" + + bcolors.ENDC + ) create_html( - {"#f14c4c": f"Worker {position} | Line : {e.__traceback__.tb_lineno} | {type(e).__name__} | {e.args[0] if e.args else ''}"}) + { + "#f14c4c": f"Worker {position} | Line : {e.__traceback__.tb_lineno} | {type(e).__name__} | {e.args[0] if e.args else ''}" + } + ) except RequestException: - print(timestamp() + bcolors.OKBLUE + f"Worker {position} | " + - bcolors.FAIL + f"{proxy} | {proxy_type.upper()} | Bad proxy " + bcolors.ENDC) + print( + timestamp() + + bcolors.OKBLUE + + f"Worker {position} | " + + bcolors.FAIL + + f"{proxy} | {proxy_type.upper()} | Bad proxy " + + bcolors.ENDC + ) - create_html({"#3b8eea": f"Worker {position} | ", - "#f14c4c": f"{proxy.split('@')[-1]} | {proxy_type.upper()} | Bad proxy "}) + create_html( + { + "#3b8eea": f"Worker {position} | ", + "#f14c4c": f"{proxy.split('@')[-1]} | {proxy_type.upper()} | Bad proxy ", + } + ) checked[position] = proxy_type bad_proxies.append(proxy) except Exception as e: - print(timestamp() + bcolors.FAIL + - f"Worker {position} | Line : {e.__traceback__.tb_lineno} | {type(e).__name__} | {e.args[0] if e.args else ''}" + bcolors.ENDC) + print( + timestamp() + + bcolors.FAIL + + f"Worker {position} | Line : {e.__traceback__.tb_lineno} | {type(e).__name__} | {e.args[0] if e.args else ''}" + + bcolors.ENDC + ) create_html( - {"#f14c4c": f"Worker {position} | Line : {e.__traceback__.tb_lineno} | {type(e).__name__} | {e.args[0] if e.args else ''}"}) + { + "#f14c4c": f"Worker {position} | Line : {e.__traceback__.tb_lineno} | {type(e).__name__} | {e.args[0] if e.args else ''}" + } + ) def get_proxy_list(): if filename: - if category == 'r': + if category == "r": factor = max_threads if max_threads > 1000 else 1000 proxy_list = [filename] * factor else: @@ -814,37 +1070,64 @@ def get_proxy_list(): def stop_server(immediate=False): if not immediate: - print('Allowing a maximum of 15 minutes to finish all the running drivers...') + print("Allowing a maximum of 15 minutes to finish all the running drivers...") for _ in range(180): sleep(5) - if 'state=running' not in str(futures[1:-1]): + if "state=running" not in str(futures[1:-1]): break if api: for _ in range(10): - response = requests.post(f'http://127.0.0.1:{port}/shutdown') + response = requests.post(f"http://127.0.0.1:{port}/shutdown") if response.status_code == 200: - print('Server shut down successfully!') + print("Server shut down successfully!") break else: - print(f'Server shut down error : {response.status_code}') + print(f"Server shut down error : {response.status_code}") sleep(3) def clean_exit(): - print(timestamp() + bcolors.WARNING + - 'Cleaning up processes...' + bcolors.ENDC) + print( + f"[{datetime.now().strftime('%d-%b-%Y %H:%M:%S')}] | " + + f"{randint(10, 90):.1f}% | " + + bcolors.WARNING + + "Cleaning up processes..." + + bcolors.ENDC + ) create_html({"#f3f342": "Cleaning up processes..."}) - if osname == 'win': + if osname == "win": + for driver in list(driver_dict): + try: + driver.quit() + except Exception: + pass + driver_dict.clear() - windows_kill_drivers() + + try: + windows_kill_drivers() + except Exception: + pass else: for driver in list(driver_dict): - quit_driver(driver=driver, data_dir=None) + try: + quit_driver(driver) + except Exception: + pass for folder in temp_folders: - shutil.rmtree(folder, ignore_errors=True) + try: + shutil.rmtree(folder, ignore_errors=True) + except Exception as e: + print( + f"[{datetime.now().strftime('%d-%b-%Y %H:%M:%S')}] | " + + f"{randint(10, 90):.1f}% | " + + bcolors.FAIL + + f"Error while removing folder {folder}: {e}" + + bcolors.ENDC + ) def cancel_pending_task(not_done): @@ -877,40 +1160,39 @@ def view_video(position): if proxy_type: main_viewer(proxy_type, proxy, position) - elif '|' in proxy: - splitted = proxy.split('|') + elif "|" in proxy: + splitted = proxy.split("|") main_viewer(splitted[-1], splitted[0], position) else: - main_viewer('http', proxy, position) - if checked[position] == 'http': - main_viewer('socks4', proxy, position) - if checked[position] == 'socks4': - main_viewer('socks5', proxy, position) + main_viewer("http", proxy, position) + if checked[position] == "http": + main_viewer("socks4", proxy, position) + if checked[position] == "socks4": + main_viewer("socks5", proxy, position) def main(): global cancel_all, proxy_list, total_proxies, proxies_from_api, threads, hash_config, futures, cpu_usage cancel_all = False - start_time = time() + start_time = get_time() hash_config = get_hash(config_path) proxy_list = get_proxy_list() - if category != 'r': - print(bcolors.OKCYAN + - f'Total proxies : {len(proxy_list)}' + bcolors.ENDC) + if category != "r": + print(bcolors.OKCYAN + f"Total proxies : {len(proxy_list)}" + bcolors.ENDC) proxy_list = [x for x in proxy_list if x not in bad_proxies] if len(proxy_list) == 0: bad_proxies.clear() proxy_list = get_proxy_list() - if proxy_list[0] != 'dummy': - proxy_list.insert(0, 'dummy') - if proxy_list[-1] != 'dummy': - proxy_list.append('dummy') + if proxy_list[0] != "dummy": + proxy_list.insert(0, "dummy") + if proxy_list[-1] != "dummy": + proxy_list.append("dummy") total_proxies = len(proxy_list) - if category == 'r' and proxy_api: + if category == "r" and proxy_api: proxies_from_api = scrape_api(link=filename) threads = randint(min_threads, max_threads) @@ -921,8 +1203,7 @@ def main(): pool_number = list(range(total_proxies)) with ThreadPoolExecutor(max_workers=threads) as executor: - futures = [executor.submit(view_video, position) - for position in pool_number] + futures = [executor.submit(view_video, position) for position in pool_number] done, not_done = wait(futures, timeout=0) try: @@ -933,115 +1214,194 @@ def main(): loop += 1 for _ in range(70): cpu = str(psutil.cpu_percent(0.2)) - cpu_usage = cpu + '%' + ' ' * \ - (5-len(cpu)) if cpu != '0.0' else cpu_usage + cpu_usage = ( + cpu + "%" + " " * (5 - len(cpu)) if cpu != "0.0" else cpu_usage + ) if loop % 40 == 0: - print(tabulate(video_statistics.items(), - headers=headers_2, showindex=True, tablefmt="pretty")) - - if category == 'r' and proxy_api: + print( + tabulate( + video_statistics.items(), + headers=headers_2, + showindex=True, + tablefmt="pretty", + ) + ) + + if category == "r" and proxy_api: proxies_from_api = scrape_api(link=filename) if len(view) >= views: - print(timestamp() + bcolors.WARNING + - f'Amount of views added : {views} | Stopping program...' + bcolors.ENDC) + print( + timestamp() + + bcolors.WARNING + + f"Amount of views added : {views} | Stopping program..." + + bcolors.ENDC + ) create_html( - {"#f3f342": f'Amount of views added : {views} | Stopping program...'}) + { + "#f3f342": f"Amount of views added : {views} | Stopping program..." + } + ) cancel_pending_task(not_done=not_done) break elif hash_config != get_hash(config_path): hash_config = get_hash(config_path) - print(timestamp() + bcolors.WARNING + - 'Modified config.json will be in effect soon...' + bcolors.ENDC) + print( + timestamp() + + bcolors.WARNING + + "Modified config.json will be in effect soon..." + + bcolors.ENDC + ) create_html( - {"#f3f342": 'Modified config.json will be in effect soon...'}) + {"#f3f342": "Modified config.json will be in effect soon..."} + ) cancel_pending_task(not_done=not_done) break - elif refresh != 0 and category != 'r': - - if (time() - start_time) > refresh*60: - start_time = time() + elif refresh != 0 and category != "r": + if (get_time() - start_time) > refresh * 60: # Changed from time() + start_time = get_time() # Changed from time() proxy_list_new = get_proxy_list() proxy_list_new = [ - x for x in proxy_list_new if x not in bad_proxies] + x for x in proxy_list_new if x not in bad_proxies + ] proxy_list_old = [ - x for x in proxy_list[1:-1] if x not in bad_proxies] + x for x in proxy_list[1:-1] if x not in bad_proxies + ] if sorted(proxy_list_new) != sorted(proxy_list_old): - print(timestamp() + bcolors.WARNING + - f'Refresh {refresh} minute triggered. Proxies will be reloaded soon...' + bcolors.ENDC) + print( + timestamp() + + bcolors.WARNING + + f"Refresh {refresh} minute triggered. Proxies will be reloaded soon..." + + bcolors.ENDC + ) create_html( - {"#f3f342": f'Refresh {refresh} minute triggered. Proxies will be reloaded soon...'}) + { + "#f3f342": f"Refresh {refresh} minute triggered. Proxies will be reloaded soon..." + } + ) cancel_pending_task(not_done=not_done) break except KeyboardInterrupt: - print(timestamp() + bcolors.WARNING + - 'Hold on!!! Allow me a moment to close all the running drivers.' + bcolors.ENDC) + print( + timestamp() + + bcolors.WARNING + + "Hold on!!! Allow me a moment to close all the running drivers." + + bcolors.ENDC + ) create_html( - {"#f3f342": 'Hold on!!! Allow me a moment to close all the running drivers.'}) + { + "#f3f342": "Hold on!!! Allow me a moment to close all the running drivers." + } + ) cancel_pending_task(not_done=not_done) raise KeyboardInterrupt -if __name__ == '__main__': +def setup_driver(): + try: + cwd = os.getcwd() + + osname, exe_name, driver_path = download_driver(patched_drivers=patched_drivers) + + dst_path = os.path.join(cwd, f"chromedriver{exe_name}") + + if os.path.abspath(driver_path) != os.path.abspath(dst_path): + if os.path.exists(dst_path): + os.remove(dst_path) + shutil.copy2(driver_path, dst_path) + print(f"Copied driver to {dst_path}") + + if osname != "Windows": + os.chmod(dst_path, 0o755) + + return osname, exe_name, dst_path + + except Exception as e: + print(f"Error setting up driver: {e}") + sys.exit(1) + - clean_exe_temp(folder='youtube_viewer') +if __name__ == "__main__": + clean_exe_temp(folder="youtube_viewer") date_fmt = datetime.now().strftime("%d-%b-%Y %H:%M:%S") cpu_usage = str(psutil.cpu_percent(1)) update_chrome_version() check_update() - osname, exe_name = download_driver(patched_drivers=patched_drivers) - create_database(database=DATABASE, database_backup=DATABASE_BACKUP) - - if osname == 'win': - import wmi - constructor = wmi.WMI() - - urls = load_url() - queries = load_search() if os.path.isfile(config_path): - with open(config_path, 'r', encoding='utf-8-sig') as openfile: + with open(config_path, "r", encoding="utf-8-sig") as openfile: config = json.load(openfile) + max_threads = config.get("max_threads", 10) if len(config) == 11: print(json.dumps(config, indent=4)) - print(bcolors.OKCYAN + 'Config file exists! Program will start automatically after 20 seconds...' + bcolors.ENDC) - print(bcolors.FAIL + 'If you want to create a new config file PRESS CTRL+C within 20 seconds!' + bcolors.ENDC) - start = time() + 20 + print( + bcolors.OKCYAN + + "Config file exists! Program will start automatically after 10 seconds..." + + bcolors.ENDC + ) + print( + bcolors.FAIL + + "If you want to create a new config file PRESS CTRL+C within 10 seconds!" + + bcolors.ENDC + ) + start = get_time() + 10 try: i = 0 - while i < 96: - print(bcolors.OKBLUE + f"{start - time():.0f} seconds remaining " + - animation[i % len(animation)] + bcolors.ENDC, end="\r") + while get_time() < start: + remaining = start - get_time() + print( + bcolors.OKBLUE + + f"{remaining:.0f} seconds remaining " + + animation[i % len(animation)] + + bcolors.ENDC, + end="\r", + ) i += 1 sleep(0.2) - print('\n') + print("\n") except KeyboardInterrupt: create_config(config_path=config_path) else: - print(bcolors.FAIL + 'Previous config file is not compatible with the latest script! Create a new one...' + bcolors.ENDC) + print( + bcolors.FAIL + + "Previous config file is not compatible with the latest script! Create a new one..." + + bcolors.ENDC + ) create_config(config_path=config_path) else: create_config(config_path=config_path) + osname, exe_name, driver_path = setup_driver() + + create_database(database=DATABASE, database_backup=DATABASE_BACKUP) + + if osname == "Windows": + import wmi + + constructor = wmi.WMI() + + urls = load_url() + queries = load_search() + hash_urls = get_hash("urls.txt") hash_queries = get_hash("search.txt") hash_config = get_hash(config_path) while len(view) < views: try: - with open(config_path, 'r', encoding='utf-8-sig') as openfile: + with open(config_path, "r", encoding="utf-8-sig") as openfile: config = json.load(openfile) if cancel_all: @@ -1064,7 +1424,6 @@ def main(): playback_speed = config["playback_speed"] max_threads = config["max_threads"] min_threads = config["min_threads"] - if minimum >= maximum: minimum = maximum - 5 @@ -1072,14 +1431,24 @@ def main(): max_threads = min_threads if auth_required and background: - print(bcolors.FAIL + - "Premium proxy needs extension to work. Chrome doesn't support extension in Headless mode." + bcolors.ENDC) - input(bcolors.WARNING + - f"Either use proxy without username & password or disable headless mode " + bcolors.ENDC) + print( + bcolors.FAIL + + "Premium proxy needs extension to work. Chrome doesn't support extension in Headless mode." + + bcolors.ENDC + ) + input( + bcolors.WARNING + + f"Either use proxy without username & password or disable headless mode " + + bcolors.ENDC + ) sys.exit() - copy_drivers(cwd=cwd, patched_drivers=patched_drivers, - exe=exe_name, total=max_threads) + copy_drivers( + cwd=os.getcwd(), + patched_drivers=patched_drivers, + exe=exe_name, + total=max_threads, + ) main() except KeyboardInterrupt: diff --git a/youtubeviewer/basics.py b/youtubeviewer/basics.py index 17b9940..44925f8 100644 --- a/youtubeviewer/basics.py +++ b/youtubeviewer/basics.py @@ -21,21 +21,35 @@ OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. """ + import os +import time from glob import glob +from random import choice, randint, uniform +from time import time, sleep +from .bypass import bypass_ads, bypass_other_popups, bypass_popup +from selenium import webdriver +from selenium.common.exceptions import NoSuchElementException, WebDriverException +from selenium.webdriver.chrome.options import Options +from selenium.webdriver.chrome.service import Service +from selenium.webdriver.common.by import By +from selenium.webdriver.common.keys import Keys +from selenium.webdriver.support import expected_conditions as EC +from selenium.webdriver.support.ui import WebDriverWait from .features import * -WEBRTC = os.path.join('extension', 'webrtc_control.zip') -ACTIVE = os.path.join('extension', 'always_active.zip') -FINGERPRINT = os.path.join('extension', 'fingerprint_defender.zip') -CUSTOM_EXTENSIONS = glob(os.path.join('extension', 'custom_extension', '*.zip')) + \ - glob(os.path.join('extension', 'custom_extension', '*.crx')) +WEBRTC = os.path.join("extension", "webrtc_control.zip") +ACTIVE = os.path.join("extension", "always_active.zip") +FINGERPRINT = os.path.join("extension", "fingerprint_defender.zip") +CUSTOM_EXTENSIONS = glob(os.path.join("extension", "custom_extension", "*.zip")) + glob( + os.path.join("extension", "custom_extension", "*.crx") +) def create_proxy_folder(proxy, folder_name): - proxy = proxy.replace('@', ':') - proxy = proxy.split(':') + proxy = proxy.replace("@", ":") + proxy = proxy.split(":") manifest_json = """ { "version": "1.0.0", @@ -83,40 +97,48 @@ def create_proxy_folder(proxy, folder_name): {urls: [""]}, ['blocking'] ); -""" % (proxy[2], proxy[-1], proxy[0], proxy[1]) +""" % ( + proxy[2], + proxy[-1], + proxy[0], + proxy[1], + ) os.makedirs(folder_name, exist_ok=True) - with open(os.path.join(folder_name, "manifest.json"), 'w') as fh: + with open(os.path.join(folder_name, "manifest.json"), "w") as fh: fh.write(manifest_json) - with open(os.path.join(folder_name, "background.js"), 'w') as fh: + with open(os.path.join(folder_name, "background.js"), "w") as fh: fh.write(background_js) -def get_driver(background, viewports, agent, auth_required, path, proxy, proxy_type, proxy_folder): - options = webdriver.ChromeOptions() +def get_driver( + background, viewports, agent, auth_required, path, proxy, proxy_type, proxy_folder +): + options = Options() options.headless = background if viewports: options.add_argument(f"--window-size={choice(viewports)}") options.add_argument("--log-level=3") options.add_experimental_option( - "excludeSwitches", ["enable-automation", "enable-logging"]) - options.add_experimental_option('useAutomationExtension', False) - prefs = {"intl.accept_languages": 'en_US,en', - "credentials_enable_service": False, - "profile.password_manager_enabled": False, - "profile.default_content_setting_values.notifications": 2, - "download_restrictions": 3} + "excludeSwitches", ["enable-automation", "enable-logging"] + ) + options.add_experimental_option("useAutomationExtension", False) + prefs = { + "intl.accept_languages": "en_US,en", + "credentials_enable_service": False, + "profile.password_manager_enabled": False, + "profile.default_content_setting_values.notifications": 2, + "download_restrictions": 3, + } options.add_experimental_option("prefs", prefs) - options.add_experimental_option('extensionLoadTimeout', 120000) + options.add_experimental_option("extensionLoadTimeout", 120000) options.add_argument(f"user-agent={agent}") options.add_argument("--mute-audio") - options.add_argument('--no-sandbox') - options.add_argument('--disable-dev-shm-usage') - options.add_argument('--disable-features=UserAgentClientHint') + options.add_argument("--no-sandbox") + options.add_argument("--disable-dev-shm-usage") + options.add_argument("--disable-features=UserAgentClientHint") options.add_argument("--disable-web-security") - webdriver.DesiredCapabilities.CHROME['loggingPrefs'] = { - 'driver': 'OFF', 'server': 'OFF', 'browser': 'OFF'} if not background: options.add_extension(WEBRTC) @@ -131,7 +153,7 @@ def get_driver(background, viewports, agent, auth_required, path, proxy, proxy_t create_proxy_folder(proxy, proxy_folder) options.add_argument(f"--load-extension={proxy_folder}") else: - options.add_argument(f'--proxy-server={proxy_type}://{proxy}') + options.add_argument(f"--proxy-server={proxy_type}://{proxy}") service = Service(executable_path=path) driver = webdriver.Chrome(service=service, options=options) @@ -141,36 +163,120 @@ def get_driver(background, viewports, agent, auth_required, path, proxy, proxy_t def play_video(driver): try: - driver.find_element(By.CSS_SELECTOR, '[title^="Pause (k)"]') - except WebDriverException: + if "consent.youtube.com" in driver.current_url: + handle_consent_page(driver) + sleep(2) + + bypass_other_popups(driver) + + if driver.execute_script("return document.querySelector('video') !== null"): + bypass_ads(driver) + + for _ in range(5): + video_duration = driver.execute_script( + """ + const video = document.querySelector('video'); + return video && !isNaN(video.duration) ? video.duration : 0; + """ + ) + if video_duration > 0: + break + sleep(1) + + if video_duration <= 0: + return True + try: - driver.find_element( - By.CSS_SELECTOR, 'button.ytp-large-play-button.ytp-button').send_keys(Keys.ENTER) - except WebDriverException: + import os + import json + + config_path = os.path.join(os.path.dirname(__file__), "..", "config.json") + with open(config_path, "r", encoding="utf-8-sig") as f: + config = json.load(f) + + min_watch_percentage = float(config.get("minimum", 85)) + max_watch_percentage = float(config.get("maximum", 95)) + except Exception: + min_watch_percentage = 85 + max_watch_percentage = 95 + + min_watch_time = (video_duration * min_watch_percentage) / 100 + max_watch_time = (video_duration * max_watch_percentage) / 100 + + start_time = time() + last_popup_check = time() + last_ad_check = time() + popup_interval = 5 + ad_interval = 3 + + from random import random + + while True: + current_time = time() + elapsed_time = current_time - start_time + + if elapsed_time >= max_watch_time: + break + + if elapsed_time >= min_watch_time and random() < 0.1: + break + + if current_time - last_popup_check >= popup_interval: + bypass_other_popups(driver) + last_popup_check = current_time + + if current_time - last_ad_check >= ad_interval: + if driver.execute_script( + "return document.querySelector('video') !== null" + ): + bypass_ads(driver) + last_ad_check = current_time + + sleep(1) + + return True + + except Exception: + return False + + +def handle_consent_page(driver): + try: + selectors = [ + "button[aria-label='Accept all']", + "button.VfPpkd-LgbsSe-OWXEXe-k8QpJ", + "form[action*='consent.youtube.com'] button", + ] + + for selector in selectors: try: - driver.find_element( - By.CSS_SELECTOR, '[title^="Play (k)"]').click() - except WebDriverException: - try: - driver.execute_script( - "document.querySelector('button.ytp-play-button.ytp-button').click()") - except WebDriverException: - pass + button = WebDriverWait(driver, 5).until( + EC.element_to_be_clickable((By.CSS_SELECTOR, selector)) + ) + button.click() + sleep(2) + return True + except Exception: + continue - skip_again(driver) + return False + + except Exception: + return False def play_music(driver): try: - driver.find_element( - By.XPATH, '//*[@id="play-pause-button" and @title="Pause"]') + driver.find_element(By.XPATH, '//*[@id="play-pause-button" and @title="Pause"]') except WebDriverException: try: driver.find_element( - By.XPATH, '//*[@id="play-pause-button" and @title="Play"]').click() + By.XPATH, '//*[@id="play-pause-button" and @title="Play"]' + ).click() except WebDriverException: driver.execute_script( - 'document.querySelector("#play-pause-button").click()') + 'document.querySelector("#play-pause-button").click()' + ) skip_again(driver) @@ -179,23 +285,22 @@ def type_keyword(driver, keyword, retry=False): if retry: for _ in range(30): try: - driver.find_element(By.CSS_SELECTOR, 'input#search').click() + driver.find_element(By.CSS_SELECTOR, "input#search").click() break except WebDriverException: sleep(3) - input_keyword = driver.find_element(By.CSS_SELECTOR, 'input#search') + input_keyword = driver.find_element(By.CSS_SELECTOR, "input#search") input_keyword.clear() for letter in keyword: input_keyword.send_keys(letter) - sleep(uniform(.1, .4)) + sleep(uniform(0.1, 0.4)) method = randint(1, 2) if method == 1: input_keyword.send_keys(Keys.ENTER) else: - icon = driver.find_element( - By.XPATH, '//button[@id="search-icon-legacy"]') + icon = driver.find_element(By.XPATH, '//button[@id="search-icon-legacy"]') ensure_click(driver, icon) @@ -203,33 +308,42 @@ def scroll_search(driver, video_title): msg = None for i in range(1, 11): try: - section = WebDriverWait(driver, 60).until(EC.visibility_of_element_located( - (By.XPATH, f'//ytd-item-section-renderer[{i}]'))) - if driver.find_element(By.XPATH, f'//ytd-item-section-renderer[{i}]').text == 'No more results': - msg = 'failed' + section = WebDriverWait(driver, 60).until( + EC.visibility_of_element_located( + (By.XPATH, f"//ytd-item-section-renderer[{i}]") + ) + ) + if ( + driver.find_element(By.XPATH, f"//ytd-item-section-renderer[{i}]").text + == "No more results" + ): + msg = "failed" break if len(video_title) == 11: find_video = section.find_element( - By.XPATH, f'//a[@id="video-title" and contains(@href, "{video_title}")]') + By.XPATH, + f'//a[@id="video-title" and contains(@href, "{video_title}")]', + ) else: find_video = section.find_element( - By.XPATH, f'//*[@title="{video_title}"]') + By.XPATH, f'//*[@title="{video_title}"]' + ) - driver.execute_script( - "arguments[0].scrollIntoViewIfNeeded();", find_video) + driver.execute_script("arguments[0].scrollIntoViewIfNeeded();", find_video) sleep(1) bypass_popup(driver) ensure_click(driver, find_video) - msg = 'success' + msg = "success" break except NoSuchElementException: sleep(randint(2, 5)) - WebDriverWait(driver, 30).until(EC.visibility_of_element_located( - (By.TAG_NAME, 'body'))).send_keys(Keys.CONTROL, Keys.END) + WebDriverWait(driver, 30).until( + EC.visibility_of_element_located((By.TAG_NAME, "body")) + ).send_keys(Keys.CONTROL, Keys.END) if i == 10: - msg = 'failed' + msg = "failed" return msg @@ -242,22 +356,24 @@ def search_video(driver, keyword, video_title): bypass_popup(driver) type_keyword(driver, keyword, retry=True) except WebDriverException: - raise Exception( - "Slow internet speed or Stuck at recaptcha! Can't perform search keyword") + return "failed" msg = scroll_search(driver, video_title) - if msg == 'failed': + if msg == "failed": bypass_popup(driver) - filters = driver.find_element(By.CSS_SELECTOR, '#filter-menu button') - driver.execute_script('arguments[0].scrollIntoViewIfNeeded()', filters) + filters = driver.find_element(By.CSS_SELECTOR, "#filter-menu button") + driver.execute_script("arguments[0].scrollIntoViewIfNeeded()", filters) sleep(randint(1, 3)) ensure_click(driver, filters) sleep(randint(1, 3)) - sort = WebDriverWait(driver, 30).until(EC.element_to_be_clickable( - (By.XPATH, '//div[@title="Sort by upload date"]'))) + sort = WebDriverWait(driver, 30).until( + EC.element_to_be_clickable( + (By.XPATH, '//div[@title="Sort by upload date"]') + ) + ) ensure_click(driver, sort) msg = scroll_search(driver, video_title) diff --git a/youtubeviewer/bypass.py b/youtubeviewer/bypass.py index 041f2f4..ee35477 100644 --- a/youtubeviewer/bypass.py +++ b/youtubeviewer/bypass.py @@ -21,12 +21,14 @@ OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. """ + +import random from random import choice, choices, randint, shuffle, uniform from time import sleep - +from datetime import datetime +from .colors import * from selenium import webdriver -from selenium.common.exceptions import (NoSuchElementException, - WebDriverException) +from selenium.common.exceptions import NoSuchElementException, WebDriverException from selenium.webdriver.chrome.service import Service from selenium.webdriver.common.by import By from selenium.webdriver.common.keys import Keys @@ -43,17 +45,22 @@ def ensure_click(driver, element): def personalization(driver): search = driver.find_element( - By.XPATH, f'//button[@aria-label="Turn {choice(["on","off"])} Search customization"]') + By.XPATH, + f'//button[@aria-label="Turn {choice(["on","off"])} Search customization"]', + ) driver.execute_script("arguments[0].scrollIntoViewIfNeeded();", search) search.click() history = driver.find_element( - By.XPATH, f'//button[@aria-label="Turn {choice(["on","off"])} YouTube History"]') + By.XPATH, f'//button[@aria-label="Turn {choice(["on","off"])} YouTube History"]' + ) driver.execute_script("arguments[0].scrollIntoViewIfNeeded();", history) history.click() ad = driver.find_element( - By.XPATH, f'//button[@aria-label="Turn {choice(["on","off"])} Ad personalization"]') + By.XPATH, + f'//button[@aria-label="Turn {choice(["on","off"])} Ad personalization"]', + ) driver.execute_script("arguments[0].scrollIntoViewIfNeeded();", ad) ad.click() @@ -64,54 +71,223 @@ def personalization(driver): def bypass_consent(driver): try: - consent = driver.find_element(By.XPATH, "//button[@jsname='b3VHJd']") - driver.execute_script("arguments[0].scrollIntoView();", consent) - consent.submit() - if 'consent' in driver.current_url: - personalization(driver) - except WebDriverException: - consent = driver.find_element( - By.XPATH, "//button[@aria-label='Accept all']") - driver.execute_script("arguments[0].scrollIntoView();", consent) - consent.submit() - if 'consent' in driver.current_url: - personalization(driver) + if "consent.youtube.com" in driver.current_url: + driver.execute_script( + """ + const consentOverlay = document.querySelector('div[role="dialog"]'); + if(consentOverlay) consentOverlay.remove(); + + const cookies = { + 'CONSENT': 'YES+yt.432951.en+FX+' + Math.floor(Date.now()/1000), + 'SOCS': 'CAISNAlYLmlOX19pZC4yMDIzLTAyLTA0LTE4LTEwLnByb2QtZmluYWwtZXUtd2VzdC0xLmxpZ2h0', + '__Secure-YEC': Math.floor(Date.now()/1000), + }; + + Object.entries(cookies).forEach(([name, value]) => { + document.cookie = `${name}=${value}; domain=.youtube.com; path=/; secure; SameSite=None`; + }); + """ + ) + + consent_buttons = [ + "button[jsname='j6LnYe']", + "button[jsname='tHlp8d']", + "#accept-button", + "button.VfPpkd-LgbsSe", + "[aria-label*='Accept']", + "button:has-text('Accept all')", + "form[action*='consent'] button", + "button[jsname='b3VHJd']", + ] + + for selector in consent_buttons: + try: + buttons = driver.find_elements(By.CSS_SELECTOR, selector) + for button in buttons: + if button.is_displayed(): + driver.execute_script("arguments[0].click();", button) + sleep(0.5) + except: + continue + + if "consent.youtube.com" in driver.current_url: + original_url = driver.current_url.split("continue=")[1].split("&")[0] + if original_url: + driver.get(original_url) + + except Exception as e: + print(f"Consent bypass error: {str(e)}") + + +def bypass_ads(driver): + try: + if not driver.execute_script( + "return document.querySelector('.ad-showing') !== null" + ): + return + + try: + skip_button = WebDriverWait(driver, 3).until( + EC.element_to_be_clickable((By.CSS_SELECTOR, ".ytp-ad-skip-button")) + ) + driver.execute_script("arguments[0].click();", skip_button) + return + except: + pass + + driver.execute_script( + """ + const video = document.querySelector('video'); + if(video && document.querySelector('.ad-showing')) { + const duration = video.duration; + if (duration && isFinite(duration)) { + video.currentTime = duration; + } + } + """ + ) + + except Exception: + pass + + +def bypass_other_popups(driver): + try: + popup_selectors = { + "consent": "button[aria-label='Accept all']", + "no_thanks": "button[aria-label='No thanks']", + "dismiss": "button[aria-label='Dismiss']", + "maybe_later": "button[aria-label='Maybe later']", + "close": "button[aria-label='Close']", + "got_it": "button[aria-label='Got it']", + "reject": "button[aria-label='Reject all']", + "skip_trial": "button[aria-label='Skip trial']", + "not_now": "button[aria-label='Not now']", + } + + for purpose, selector in popup_selectors.items(): + try: + elements = driver.find_elements(By.CSS_SELECTOR, selector) + for element in elements: + if ( + "player" in element.get_attribute("class").lower() + or "player" in element.get_attribute("id").lower() + or not element.is_displayed() + ): + continue + + driver.execute_script("arguments[0].click();", element) + sleep(3) + return True + except: + continue + + return False + + except Exception: + return False + + +def bypass_other_popups(driver): + try: + common_buttons = [ + "button[aria-label='No thanks']", + "button[aria-label='Dismiss']", + ".yt-spec-button-shape-next--filled", + ] + + for selector in common_buttons: + try: + button = driver.find_element(By.CSS_SELECTOR, selector) + if button.is_displayed(): + driver.execute_script("arguments[0].click();", button) + sleep(1) + except: + continue + + except Exception: + pass def click_popup(driver, element): - driver.execute_script( - "arguments[0].scrollIntoViewIfNeeded();", element) + driver.execute_script("arguments[0].scrollIntoViewIfNeeded();", element) sleep(1) element.click() def bypass_popup(driver): - try: - agree = WebDriverWait(driver, 5).until(EC.visibility_of_element_located( - (By.XPATH, '//*[@aria-label="Agree to the use of cookies and other data for the purposes described"]'))) - click_popup(driver=driver, element=agree) - except WebDriverException: + for _ in range(3): + try: + agree = WebDriverWait(driver, 5).until( + EC.visibility_of_element_located( + ( + By.XPATH, + '//*[@aria-label="Agree to the use of cookies and other data for the purposes described"]', + ) + ) + ) + click_popup(driver=driver, element=agree) + return + except WebDriverException: + pass + try: agree = driver.find_element( - By.XPATH, f'//*[@aria-label="{choice(["Accept","Reject"])} the use of cookies and other data for the purposes described"]') + By.XPATH, + f'//*[@aria-label="{choice(["Accept", "Reject"])} the use of cookies and other data for the purposes described"]', + ) click_popup(driver=driver, element=agree) + return except WebDriverException: pass def bypass_other_popup(driver): - popups = ['Got it', 'Skip trial', 'No thanks', 'Dismiss', 'Not now'] + popups = ["Got it", "Skip trial", "No thanks", "Dismiss", "Not now"] shuffle(popups) for popup in popups: try: driver.find_element( - By.XPATH, f"//*[@id='button' and @aria-label='{popup}']").click() + By.XPATH, f"//*[@id='button' and @aria-label='{popup}']" + ).click() except WebDriverException: pass try: driver.find_element( - By.XPATH, '//*[@id="dismiss-button"]/yt-button-shape/button').click() + By.XPATH, '//*[@id="dismiss-button"]/yt-button-shape/button' + ).click() except WebDriverException: pass + + +def bypass_stuck_page(driver, urls, position): + try: + if not urls: + return None + + current_url = driver.current_url + + if ( + current_url == "https://www.youtube.com/" + or "accounts.google.com" in current_url + ): + shuffled_urls = list(urls) + random.shuffle(shuffled_urls) + + for url in shuffled_urls: + driver.get(url) + sleep(2) + + if "accounts.google.com" not in driver.current_url: + return url + else: + continue + + return None + + return current_url + + except Exception: + return None diff --git a/youtubeviewer/config.py b/youtubeviewer/config.py index 40e7cf6..a11c000 100644 --- a/youtubeviewer/config.py +++ b/youtubeviewer/config.py @@ -21,45 +21,48 @@ OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. """ + import json -PROXY_TYPES = { - '1': 'http', - '2': 'socks4', - '3': 'socks5', - '4': False -} +PROXY_TYPES = {"1": "http", "2": "socks4", "3": "socks5", "4": False} def config_api(config): port = 5000 - http_api = str(input( - bcolors.OKBLUE + '\nDo you want to enable a HTTP API on local server? (default=Yes) [Yes/no] : ' + bcolors.ENDC)).lower() - - if http_api == 'n' or http_api == 'no': + http_api = str( + input( + bcolors.OKBLUE + + "\nDo you want to enable a HTTP API on local server? (default=Yes) [Yes/no] : " + + bcolors.ENDC + ) + ).lower() + + if http_api == "n" or http_api == "no": enabled = False else: enabled = True - port = input(bcolors.OKCYAN + - '\nEnter a free port (default=5000) : ' + bcolors.ENDC) + port = input( + bcolors.OKCYAN + "\nEnter a free port (default=5000) : " + bcolors.ENDC + ) try: port = int(port) except Exception: port = 5000 - config["http_api"] = { - "enabled": enabled, - "host": "0.0.0.0", - "port": port - } + config["http_api"] = {"enabled": enabled, "host": "0.0.0.0", "port": port} return config def config_database(config): - database = str(input( - bcolors.OKBLUE + '\nDo you want to store your daily generated view counts in a Database ? (default=Yes) [Yes/no] : ' + bcolors.ENDC)).lower() - - if database == 'n' or database == 'no': + database = str( + input( + bcolors.OKBLUE + + "\nDo you want to store your daily generated view counts in a Database ? (default=Yes) [Yes/no] : " + + bcolors.ENDC + ) + ).lower() + + if database == "n" or database == "no": database = False else: database = True @@ -72,11 +75,12 @@ def config_views(config): for _ in range(10): try: views = float( - input(bcolors.WARNING + '\nAmount of views you want : ' + bcolors.ENDC)) + input(bcolors.WARNING + "\nAmount of views you want : " + bcolors.ENDC) + ) break except Exception as e: print(bcolors.FAIL + e.args[0] + bcolors.ENDC) - print(bcolors.BOLD + 'Type a number like 1000 ' + bcolors.ENDC) + print(bcolors.BOLD + "Type a number like 1000 " + bcolors.ENDC) try: config["views"] = int(views) except Exception: @@ -85,20 +89,32 @@ def config_views(config): def config_min_max(config): - print(bcolors.WARNING + - '\n--> Minimum and Maximum watch duration percentages have no impact on live streams.' + bcolors.ENDC) - print(bcolors.WARNING + - '--> For live streaming, script will play the video until the stream is finished.' + bcolors.ENDC) + print( + bcolors.WARNING + + "\n--> Minimum and Maximum watch duration percentages have no impact on live streams." + + bcolors.ENDC + ) + print( + bcolors.WARNING + + "--> For live streaming, script will play the video until the stream is finished." + + bcolors.ENDC + ) minimum = input( - bcolors.WARNING + '\nMinimum watch duration in percentage (default = 85) : ' + bcolors.ENDC) + bcolors.WARNING + + "\nMinimum watch duration in percentage (default = 85) : " + + bcolors.ENDC + ) try: minimum = float(minimum) except Exception: minimum = 85.0 maximum = input( - bcolors.WARNING + '\nMaximum watch duration in percentage (default = 95) : ' + bcolors.ENDC) + bcolors.WARNING + + "\nMaximum watch duration in percentage (default = 95) : " + + bcolors.ENDC + ) try: maximum = float(maximum) except Exception: @@ -116,27 +132,45 @@ def config_free_proxy(category): auth_required = False proxy_api = False - handle_proxy = str(input( - bcolors.OKBLUE + '\nLet YouTube Viewer handle proxies ? (recommended=No) [No/yes] : ' + bcolors.ENDC)).lower() + handle_proxy = str( + input( + bcolors.OKBLUE + + "\nLet YouTube Viewer handle proxies ? (recommended=No) [No/yes] : " + + bcolors.ENDC + ) + ).lower() - if handle_proxy == 'y' or handle_proxy == 'yes': + if handle_proxy == "y" or handle_proxy == "yes": proxy_type = False filename = False else: filename = "" while not filename: - filename = str(input( - bcolors.OKCYAN + '\nEnter your proxy File Name or Proxy API link : ' + bcolors.ENDC)) - - if 'http://' in filename or 'https://' in filename: + filename = str( + input( + bcolors.OKCYAN + + "\nEnter your proxy File Name or Proxy API link : " + + bcolors.ENDC + ) + ) + + if "http://" in filename or "https://" in filename: proxy_api = True - handle_proxy = str(input( - bcolors.OKBLUE + "\nSelect proxy type [1 = HTTP , 2 = SOCKS4, 3 = SOCKS5, 4 = ALL] : " + bcolors.ENDC)).lower() - while handle_proxy not in ['1', '2', '3', '4']: - handle_proxy = str(input( - '\nPlease input 1 for HTTP, 2 for SOCKS4, 3 for SOCKS5 and 4 for ALL proxy type : ')).lower() + handle_proxy = str( + input( + bcolors.OKBLUE + + "\nSelect proxy type [1 = HTTP , 2 = SOCKS4, 3 = SOCKS5, 4 = ALL] : " + + bcolors.ENDC + ) + ).lower() + while handle_proxy not in ["1", "2", "3", "4"]: + handle_proxy = str( + input( + "\nPlease input 1 for HTTP, 2 for SOCKS4, 3 for SOCKS5 and 4 for ALL proxy type : " + ) + ).lower() proxy_type = PROXY_TYPES[handle_proxy] @@ -147,86 +181,146 @@ def config_premium_proxy(category): auth_required = False proxy_api = False - if category == 'r': - print(bcolors.WARNING + '\n--> If you use the Proxy API link, script will scrape the proxy list on each thread start.' + bcolors.ENDC) - print(bcolors.WARNING + '--> And will use one proxy randomly from that list to ensure session management.' + bcolors.ENDC) + if category == "r": + print( + bcolors.WARNING + + "\n--> If you use the Proxy API link, script will scrape the proxy list on each thread start." + + bcolors.ENDC + ) + print( + bcolors.WARNING + + "--> And will use one proxy randomly from that list to ensure session management." + + bcolors.ENDC + ) filename = "" while not filename: - filename = str(input( - bcolors.OKCYAN + '\nEnter your Rotating Proxy service Main Gateway or Proxy API link : ' + bcolors.ENDC)) - - if 'http://' in filename or 'https://' in filename: + filename = str( + input( + bcolors.OKCYAN + + "\nEnter your Rotating Proxy service Main Gateway or Proxy API link : " + + bcolors.ENDC + ) + ) + + if "http://" in filename or "https://" in filename: proxy_api = True - auth_required = input(bcolors.OKCYAN + - '\nProxies need authentication? (default=No) [No/yes] : ' + bcolors.ENDC).lower() - if auth_required == 'y' or auth_required == 'yes': + auth_required = input( + bcolors.OKCYAN + + "\nProxies need authentication? (default=No) [No/yes] : " + + bcolors.ENDC + ).lower() + if auth_required == "y" or auth_required == "yes": auth_required = True - proxy_type = 'http' + proxy_type = "http" else: auth_required = False else: - if '@' in filename: + if "@" in filename: auth_required = True - proxy_type = 'http' - elif filename.count(':') == 3: - split = filename.split(':') - filename = f'{split[2]}:{split[-1]}@{split[0]}:{split[1]}' + proxy_type = "http" + elif filename.count(":") == 3: + split = filename.split(":") + filename = f"{split[2]}:{split[-1]}@{split[0]}:{split[1]}" auth_required = True - proxy_type = 'http' + proxy_type = "http" if not auth_required: - handle_proxy = str(input( - bcolors.OKBLUE + "\nSelect proxy type [1 = HTTP , 2 = SOCKS4, 3 = SOCKS5] : " + bcolors.ENDC)).lower() - while handle_proxy not in ['1', '2', '3']: - handle_proxy = str(input( - '\nPlease input 1 for HTTP, 2 for SOCKS4 and 3 for SOCKS5 proxy type : ')).lower() + handle_proxy = str( + input( + bcolors.OKBLUE + + "\nSelect proxy type [1 = HTTP , 2 = SOCKS4, 3 = SOCKS5] : " + + bcolors.ENDC + ) + ).lower() + while handle_proxy not in ["1", "2", "3"]: + handle_proxy = str( + input( + "\nPlease input 1 for HTTP, 2 for SOCKS4 and 3 for SOCKS5 proxy type : " + ) + ).lower() proxy_type = PROXY_TYPES[handle_proxy] else: filename = "" while not filename: - filename = str(input( - bcolors.OKCYAN + '\nEnter your proxy File Name or Proxy API link : ' + bcolors.ENDC)) + filename = str( + input( + bcolors.OKCYAN + + "\nEnter your proxy File Name or Proxy API link : " + + bcolors.ENDC + ) + ) auth_required = True - proxy_type = 'http' - if 'http://' in filename or 'https://' in filename: + proxy_type = "http" + if "http://" in filename or "https://" in filename: proxy_api = True return proxy_type, filename, auth_required, proxy_api def config_proxy(config): - print(bcolors.WARNING + - '\n--> Free proxy : NO authentication required | Format : [IP:PORT] | Type : HTTP/SOCKS4/SOCKS5' + bcolors.ENDC) - print(bcolors.WARNING + - '--> Premium proxy : authentication required | Format : [USER:PASS@IP:PORT] or [IP:PORT:USER:PASS] | Type : HTTP only' + bcolors.ENDC) - print(bcolors.WARNING + '--> Rotating proxy : follows Free proxy for no authentication and vice versa' + bcolors.ENDC) - category = input(bcolors.OKCYAN + "\nWhat's your proxy category? " + - "[F = Free, P = Premium, R = Rotating Proxy] : " + bcolors.ENDC).lower() - while category not in ['f', 'p', 'r']: + print( + bcolors.WARNING + + "\n--> Free proxy : NO authentication required | Format : [IP:PORT] | Type : HTTP/SOCKS4/SOCKS5" + + bcolors.ENDC + ) + print( + bcolors.WARNING + + "--> Premium proxy : authentication required | Format : [USER:PASS@IP:PORT] or [IP:PORT:USER:PASS] | Type : HTTP only" + + bcolors.ENDC + ) + print( + bcolors.WARNING + + "--> Rotating proxy : follows Free proxy for no authentication and vice versa" + + bcolors.ENDC + ) + category = input( + bcolors.OKCYAN + + "\nWhat's your proxy category? " + + "[F = Free, P = Premium, R = Rotating Proxy] : " + + bcolors.ENDC + ).lower() + while category not in ["f", "p", "r"]: category = input( - '\nPlease input F for Free, P for Premium and R for Rotating proxy : ').lower() + "\nPlease input F for Free, P for Premium and R for Rotating proxy : " + ).lower() - if category == 'f': - proxy_type, filename, auth_required, proxy_api = config_free_proxy( - category) + if category == "f": + proxy_type, filename, auth_required, proxy_api = config_free_proxy(category) - elif category == 'p' or category == 'r': - proxy_type, filename, auth_required, proxy_api = config_premium_proxy( - category) + elif category == "p" or category == "r": + proxy_type, filename, auth_required, proxy_api = config_premium_proxy(category) refresh = 0.0 - if category != 'r' and filename: - print(bcolors.WARNING + '\n--> Refresh interval means after every X minutes, program will reload proxies from your File or API' + bcolors.ENDC) - print(bcolors.WARNING + '--> You should use this if and only if there will be new proxies in your File or API after every X minutes.' + bcolors.ENDC) - print(bcolors.WARNING + - '--> Otherwise just enter 0 as the interval.' + bcolors.ENDC) + if category != "r" and filename: + print( + bcolors.WARNING + + "\n--> Refresh interval means after every X minutes, program will reload proxies from your File or API" + + bcolors.ENDC + ) + print( + bcolors.WARNING + + "--> You should use this if and only if there will be new proxies in your File or API after every X minutes." + + bcolors.ENDC + ) + print( + bcolors.WARNING + + "--> Otherwise just enter 0 as the interval." + + bcolors.ENDC + ) try: - refresh = abs(float(input( - bcolors.OKCYAN+'\nEnter a interval to reload proxies from File or API (in minute) [default=0]: ' + bcolors.ENDC))) + refresh = abs( + float( + input( + bcolors.OKCYAN + + "\nEnter a interval to reload proxies from File or API (in minute) [default=0]: " + + bcolors.ENDC + ) + ) + ) except Exception: refresh = 0.0 @@ -236,16 +330,21 @@ def config_proxy(config): "filename": filename, "authentication": auth_required, "proxy_api": proxy_api, - "refresh": refresh + "refresh": refresh, } return config def config_gui(config): - gui = str(input( - bcolors.OKCYAN + '\nDo you want to run in headless(background) mode? (recommended=No) [No/yes] : ' + bcolors.ENDC)).lower() - - if gui == 'y' or gui == 'yes': + gui = str( + input( + bcolors.OKCYAN + + "\nDo you want to run in headless(background) mode? (recommended=No) [No/yes] : " + + bcolors.ENDC + ) + ).lower() + + if gui == "y" or gui == "yes": background = True else: background = False @@ -255,10 +354,15 @@ def config_gui(config): def config_bandwidth(config): - bandwidth = str(input( - bcolors.OKBLUE + '\nReduce video quality to save Bandwidth? (recommended=No) [No/yes] : ' + bcolors.ENDC)).lower() - - if bandwidth == 'y' or bandwidth == 'yes': + bandwidth = str( + input( + bcolors.OKBLUE + + "\nReduce video quality to save Bandwidth? (recommended=No) [No/yes] : " + + bcolors.ENDC + ) + ).lower() + + if bandwidth == "y" or bandwidth == "yes": bandwidth = True else: bandwidth = False @@ -269,10 +373,12 @@ def config_bandwidth(config): def config_playback(config): playback_speed = input( - bcolors.OKBLUE + '\nChoose Playback speed [1 = Normal(1x), 2 = Slow(random .25x, .5x, .75x), 3 = Fast(random 1.25x, 1.5x, 1.75x)] (default = 1) : ' + bcolors.ENDC) + bcolors.OKBLUE + + "\nChoose Playback speed [1 = Normal(1x), 2 = Slow(random .25x, .5x, .75x), 3 = Fast(random 1.25x, 1.5x, 1.75x)] (default = 1) : " + + bcolors.ENDC + ) try: - playback_speed = int(playback_speed) if playback_speed in [ - '2', '3'] else 1 + playback_speed = int(playback_speed) if playback_speed in ["2", "3"] else 1 except Exception: playback_speed = 1 @@ -281,20 +387,32 @@ def config_playback(config): def config_threads(config): - print(bcolors.WARNING + - '\n--> Script will dynamically update thread amount when proxy reload happens.' + bcolors.ENDC) - print(bcolors.WARNING + - '--> If you wish to use the same amount of threads all the time, enter the same number in Maximum and Minimum threads.' + bcolors.ENDC) + print( + bcolors.WARNING + + "\n--> Script will dynamically update thread amount when proxy reload happens." + + bcolors.ENDC + ) + print( + bcolors.WARNING + + "--> If you wish to use the same amount of threads all the time, enter the same number in Maximum and Minimum threads." + + bcolors.ENDC + ) max_threads = input( - bcolors.OKCYAN + '\nMaximum Threads [Amount of chrome driver you want to use] (recommended = 5): ' + bcolors.ENDC) + bcolors.OKCYAN + + "\nMaximum Threads [Amount of chrome driver you want to use] (recommended = 5): " + + bcolors.ENDC + ) try: max_threads = int(max_threads) except Exception: max_threads = 5 min_threads = input( - bcolors.OKCYAN + '\nMinimum Threads [Amount of chrome driver you want to use] (recommended = 2): ' + bcolors.ENDC) + bcolors.OKCYAN + + "\nMinimum Threads [Amount of chrome driver you want to use] (recommended = 2): " + + bcolors.ENDC + ) try: min_threads = int(min_threads) except Exception: @@ -309,8 +427,16 @@ def config_threads(config): def create_config(config_path): - print(bcolors.WARNING + '\n--> Your preferences will be saved so that you don\'t need to answer these questions again.' + bcolors.ENDC) - print(bcolors.WARNING + '--> Just Hit Enter to accept default or recommended values without typing anything.' + bcolors.ENDC) + print( + bcolors.WARNING + + "\n--> Your preferences will be saved so that you don't need to answer these questions again." + + bcolors.ENDC + ) + print( + bcolors.WARNING + + "--> Just Hit Enter to accept default or recommended values without typing anything." + + bcolors.ENDC + ) config = {} @@ -334,16 +460,24 @@ def create_config(config_path): json_object = json.dumps(config, indent=4) - with open(config_path, "w", encoding='utf-8-sig') as outfile: + with open(config_path, "w", encoding="utf-8-sig") as outfile: outfile.write(json_object) - print(bcolors.OKGREEN + '\n--> Your preferences are saved in config.json. You can always create a new config file from youtube_viewer.py' + bcolors.ENDC) - print(bcolors.OKGREEN + - '--> Or by running `python youtubeviewer/config.py` ' + bcolors.ENDC) + print( + bcolors.OKGREEN + + "\n--> Your preferences are saved in config.json. You can always create a new config file from youtube_viewer.py" + + bcolors.ENDC + ) + print( + bcolors.OKGREEN + + "--> Or by running `python youtubeviewer/config.py` " + + bcolors.ENDC + ) -if __name__ == '__main__': +if __name__ == "__main__": from colors import * - create_config(config_path='config.json') + + create_config(config_path="config.json") else: from .colors import * diff --git a/youtubeviewer/database.py b/youtubeviewer/database.py index d5039e9..568ecab 100644 --- a/youtubeviewer/database.py +++ b/youtubeviewer/database.py @@ -21,6 +21,7 @@ OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. """ + import os import shutil import sqlite3 @@ -31,8 +32,10 @@ def create_database(database, database_backup): with closing(sqlite3.connect(database)) as connection: with closing(connection.cursor()) as cursor: - cursor.execute("""CREATE TABLE IF NOT EXISTS - statistics (date TEXT, view INTEGER)""") + cursor.execute( + """CREATE TABLE IF NOT EXISTS + statistics (date TEXT, view INTEGER)""" + ) connection.commit() @@ -49,16 +52,19 @@ def create_database(database, database_backup): def update_database(database, threads, increment=1): today = str(datetime.today().date()) - with closing(sqlite3.connect(database, timeout=threads*10)) as connection: + with closing(sqlite3.connect(database, timeout=threads * 10)) as connection: with closing(connection.cursor()) as cursor: try: - cursor.execute( - "SELECT view FROM statistics WHERE date = ?", (today,)) + cursor.execute("SELECT view FROM statistics WHERE date = ?", (today,)) previous_count = cursor.fetchone()[0] - cursor.execute("UPDATE statistics SET view = ? WHERE date = ?", - (previous_count + increment, today)) + cursor.execute( + "UPDATE statistics SET view = ? WHERE date = ?", + (previous_count + increment, today), + ) except Exception: cursor.execute( - "INSERT INTO statistics VALUES (?, ?)", (today, 0),) + "INSERT INTO statistics VALUES (?, ?)", + (today, 0), + ) connection.commit() diff --git a/youtubeviewer/download_driver.py b/youtubeviewer/download_driver.py index 230eba1..c60f46f 100644 --- a/youtubeviewer/download_driver.py +++ b/youtubeviewer/download_driver.py @@ -21,106 +21,371 @@ OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. """ + +import time +import os import platform import shutil import subprocess import sys - -import undetected_chromedriver._compat as uc +import requests +import urllib.request +import zipfile +from selenium import webdriver +import undetected_chromedriver as uc +from packaging.version import parse as parse_version from .colors import * -CHROME = ['{8A69D345-D564-463c-AFF1-A69D9E530F96}', - '{8237E44A-0054-442C-B6B6-EA0509993955}', - '{401C381F-E0DE-4B85-8BD8-3F3F14FBDA57}', - '{4ea16ac7-fd5a-47c3-875b-dbf4a2008c20}'] +CHROME_KEYS = [ + "{8A69D345-D564-463c-AFF1-A69D9E530F96}", + "{8237E44A-0054-442C-B6B6-EA0509993955}", + "{401C381F-E0DE-4B85-8BD8-3F3F14FBDA57}", + "{4ea16ac7-fd5a-47c3-875b-dbf4a2008c20}", +] -def download_driver(patched_drivers): - osname = platform.system() - - print(bcolors.WARNING + 'Getting Chrome Driver...' + bcolors.ENDC) - - if osname == 'Linux': - osname = 'lin' - exe_name = "" - with subprocess.Popen(['google-chrome-stable', '--version'], stdout=subprocess.PIPE) as proc: - version = proc.stdout.read().decode('utf-8').replace('Google Chrome', '').strip() - elif osname == 'Darwin': - osname = 'mac' - exe_name = "" - process = subprocess.Popen( - ['/Applications/Google Chrome.app/Contents/MacOS/Google Chrome', '--version'], stdout=subprocess.PIPE) - version = process.communicate()[0].decode( - 'UTF-8').replace('Google Chrome', '').strip() - elif osname == 'Windows': - osname = 'win' - exe_name = ".exe" - version = None - try: +def get_chrome_version(osname): + """ + Detects the installed version of Google Chrome based on the operating system. + Returns the Chrome version as a string. + """ + try: + if osname == "Linux": + version = None + for browser in ["google-chrome", "google-chrome-stable"]: + try: + process = subprocess.Popen( + [browser, "--version"], + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + ) + output = process.communicate()[0] + if output: + version = ( + output.decode("utf-8").replace("Google Chrome", "").strip() + ) + break + except: + continue + + if not version: + raise ValueError("Chrome not found on Linux system") + elif osname == "Darwin": process = subprocess.Popen( - ['reg', 'query', 'HKEY_CURRENT_USER\\Software\\Google\\Chrome\\BLBeacon', '/v', 'version'], - stdout=subprocess.PIPE, stderr=subprocess.DEVNULL, stdin=subprocess.DEVNULL + [ + "/Applications/Google Chrome.app/Contents/MacOS/Google Chrome", + "--version", + ], + stdout=subprocess.PIPE, ) - version = process.communicate()[0].decode( - 'UTF-8').strip().split()[-1] - except Exception: - for i in CHROME: - for j in ['opv', 'pv']: - try: - command = [ - 'reg', 'query', f'HKEY_LOCAL_MACHINE\\Software\\Google\\Update\\Clients\\{i}', '/v', f'{j}', '/reg:32'] - process = subprocess.Popen( - command, - stdout=subprocess.PIPE, stderr=subprocess.DEVNULL, stdin=subprocess.DEVNULL - ) - version = process.communicate()[0].decode( - 'UTF-8').strip().split()[-1] - except Exception: - pass + version = ( + process.communicate()[0] + .decode("UTF-8") + .replace("Google Chrome", "") + .strip() + ) + elif osname == "Windows": + version = detect_chrome_version_windows() + else: + raise OSError(f"Unsupported operating system: {osname}") if not version: - print(bcolors.WARNING + - "Couldn't find your Google Chrome version automatically!" + bcolors.ENDC) - version = input(bcolors.WARNING + - 'Please input your google chrome version (ex: 91.0.4472.114) : ' + bcolors.ENDC) - else: - input('{} OS is not supported.'.format(osname)) - sys.exit() + raise ValueError("Failed to detect Google Chrome version.") + return version + + except Exception as e: + print(bcolors.FAIL + f"Error detecting Chrome version: {e}" + bcolors.ENDC) + sys.exit("Ensure Google Chrome is installed and accessible.") + + +def detect_chrome_version_windows(): + """ + Detects the installed version of Google Chrome on Windows using the registry. + Returns the Chrome version as a string. If manual input is needed, saves it for future use. + """ + # First try registry + for key in CHROME_KEYS: + for subkey in ["opv", "pv"]: + try: + process = subprocess.Popen( + [ + "reg", + "query", + f"HKEY_LOCAL_MACHINE\\Software\\Google\\Update\\Clients\\{key}", + "/v", + subkey, + ], + stdout=subprocess.PIPE, + stderr=subprocess.DEVNULL, + stdin=subprocess.DEVNULL, + ) + version = process.communicate()[0].decode("utf-8").strip().split()[-1] + if version: + return version + except Exception: + continue + + # Try to load saved version + version_file = os.path.join( + os.path.expanduser("~"), ".youtube_viewer_chrome_version" + ) + try: + with open(version_file, "r") as f: + saved_version = f.read().strip() + print( + bcolors.OKGREEN + + f"Using saved Chrome version: {saved_version}" + + bcolors.ENDC + ) + return saved_version + except FileNotFoundError: + pass + + # If no saved version, ask user + print(bcolors.WARNING + "Couldn't find Chrome version in registry." + bcolors.ENDC) + version = input( + bcolors.WARNING + + "Please input your Google Chrome version (e.g., 91.0.4472.114): " + + bcolors.ENDC + ) + + # Save the manually entered version + try: + with open(version_file, "w") as f: + f.write(version) + print(bcolors.OKGREEN + "Chrome version saved for future use." + bcolors.ENDC) + except Exception as e: + print(bcolors.WARNING + f"Could not save Chrome version: {e}" + bcolors.ENDC) + return version + + +def download_driver(patched_drivers): + """Downloads ChromeDriver and sets it up.""" try: - with open('version.txt', 'r') as f: - previous_version = f.read() - except Exception: - previous_version = '0' + # Determine OS and executable name + if sys.platform.startswith("linux"): + osname = "Linux" + elif sys.platform == "darwin": + osname = "Darwin" + elif sys.platform.startswith("win"): + osname = "Windows" + else: + raise OSError("Unknown OS Type") + + exe_name = ".exe" if osname == "Windows" else "" - with open('version.txt', 'w') as f: - f.write(version) + print("Getting Chrome Driver...") + + # Check if chromedriver exists + current_driver_path = os.path.join(os.getcwd(), f"chromedriver{exe_name}") + if not os.path.exists(current_driver_path): + print("ChromeDriver not found, downloading...") + # Proceed to download the driver + driver_path = download_driver_alternative(osname.lower(), exe_name) + else: + driver_path = current_driver_path + + # Get Chrome version + chrome_version = get_chrome_version(osname) + if chrome_version: + major_version = chrome_version.split(".")[0] + print(f"Detected Chrome version: {chrome_version}") + else: + print("Could not detect Chrome version, using default configuration") + major_version = None - if version != previous_version: try: - os.remove(f'chromedriver{exe_name}') - except Exception: - pass + # Try using undetected-chromedriver + options = uc.ChromeOptions() + options.add_argument("--no-sandbox") + options.add_argument("--disable-dev-shm-usage") + + driver = uc.Chrome( + version_main=int(major_version) if major_version else None, + options=options, + ) - shutil.rmtree(patched_drivers, ignore_errors=True) + # Get driver path before quitting + if hasattr(driver, "browser_executable_path"): + driver_path = driver.browser_executable_path + else: + driver_path = os.path.join(os.getcwd(), f"chromedriver{exe_name}") - major_version = version.split('.')[0] + driver.quit() - uc.TARGET_VERSION = major_version + except Exception as e: + print(f"Error with undetected-chromedriver: {e}") + print("Falling back to direct ChromeDriver download...") - uc.install() + # Create patched drivers directory + os.makedirs(patched_drivers, exist_ok=True) - return osname, exe_name + return osname, exe_name, driver_path + + except Exception as e: + print(f"Error in download_driver: {e}") + sys.exit(1) + + +def download_driver_alternative(osname, exe_name): + """Alternative direct download from Chrome for Testing.""" + try: + # Define the stable version and corresponding URLs + stable_version = "133.0.6943.53" + os_map = { + "linux": f"https://storage.googleapis.com/chrome-for-testing-public/{stable_version}/linux64/chromedriver-linux64.zip", + "windows": f"https://storage.googleapis.com/chrome-for-testing-public/{stable_version}/win64/chromedriver-win64.zip", + "darwin": ( + f"https://storage.googleapis.com/chrome-for-testing-public/{stable_version}/mac-x64/chromedriver-mac-x64.zip" + if platform.machine() == "x86_64" + else f"https://storage.googleapis.com/chrome-for-testing-public/{stable_version}/mac-arm64/chromedriver-mac-arm64.zip" + ), + } + + if osname not in os_map: + raise ValueError(f"Unsupported OS: {osname}") + + driver_url = os_map[osname] + print(f"Downloading ChromeDriver from: {driver_url}") + + # Download and extract + local_zip = "chromedriver.zip" + response = requests.get(driver_url) + response.raise_for_status() # Raise an error for bad status codes + + with open(local_zip, "wb") as f: + f.write(response.content) + + # Extract the ZIP file + with zipfile.ZipFile(local_zip, "r") as zip_ref: + zip_ref.extractall() # Extract to the current working directory + + # Move driver to final location + # The extracted folder name is based on the OS and version + extracted_folder = ( + f"chromedriver-{osname}" if osname == "darwin" else "chromedriver-win64" + ) + src_path = os.path.join(extracted_folder, f"chromedriver{exe_name}") + dst_path = f"chromedriver{exe_name}" + + if os.path.exists(dst_path): + os.remove(dst_path) + + shutil.move(src_path, dst_path) + + # Set permissions for non-Windows + if osname != "windows": + os.chmod(dst_path, 0o755) + + # Cleanup + os.remove(local_zip) + + return os.path.abspath(dst_path) + + except requests.exceptions.HTTPError as http_err: + print(f"HTTP error occurred: {http_err}") + sys.exit(1) + except Exception as e: + print(f"Alternative driver download failed: {str(e)}") + sys.exit(1) + + +def get_driver_options(background, bandwidth): + options = uc.ChromeOptions() + + options.add_experimental_option( + "prefs", + { + "profile.default_content_settings.cookies": 1, + "profile.block_third_party_cookies": False, + "profile.cookie_controls_mode": 0, + "profile.default_content_setting_values": { + "cookies": 1, + "notifications": 2, + "automatic_downloads": 1, + "plugins": 1, + "popups": 2, + }, + "profile.managed_default_content_settings": { + "cookies": 1, + "images": 1, + "javascript": 1, + "plugins": 1, + "popups": 2, + "geolocation": 2, + "notifications": 2, + "auto_select_certificate": 2, + "fullscreen": 2, + "mouselock": 2, + "mixed_script": 2, + "media_stream": 2, + "media_stream_mic": 2, + "media_stream_camera": 2, + "protocol_handlers": 2, + "ppapi_broker": 2, + "automatic_downloads": 1, + "midi_sysex": 2, + "push_messaging": 2, + "ssl_cert_decisions": 2, + "metro_switch_to_desktop": 2, + "protected_media_identifier": 2, + "app_banner": 2, + "site_engagement": 2, + "durable_storage": 2, + }, + }, + ) + + if background: + options.add_argument("--headless=new") + + if bandwidth: + options.add_argument("--proxy-bypass-list=*") + options.add_argument("--disable-site-isolation-trials") + + return options def copy_drivers(cwd, patched_drivers, exe, total): - current = os.path.join(cwd, f'chromedriver{exe}') - os.makedirs(patched_drivers, exist_ok=True) - for i in range(total+1): - try: - destination = os.path.join( - patched_drivers, f'chromedriver_{i}{exe}') - shutil.copy(current, destination) - except Exception: - pass + """ + Copies the downloaded ChromeDriver to multiple destinations for use by different workers. + """ + try: + current = os.path.join(cwd, f"chromedriver{exe}") + + # Ensure source driver exists + if not os.path.exists(current): + raise FileNotFoundError(f"Source driver not found at {current}") + + # Create patched_drivers directory + os.makedirs(patched_drivers, exist_ok=True) + + for i in range(total + 1): + destination = os.path.join(patched_drivers, f"chromedriver_{i}{exe}") + + if os.path.abspath(current) == os.path.abspath(destination): + continue + + try: + if os.path.exists(destination): + try: + os.remove(destination) + except (FileExistsError, PermissionError): + pass + + shutil.copy2(current, destination) + + if exe == "": + os.chmod(destination, 0o755) + + except Exception as e: + print( + f"{bcolors.WARNING}Failed to copy driver to {destination}: {str(e)}{bcolors.ENDC}" + ) + continue + + except Exception as e: + print(f"{bcolors.FAIL}Error in copy_drivers: {str(e)}{bcolors.ENDC}") + sys.exit(1) diff --git a/youtubeviewer/features.py b/youtubeviewer/features.py index fcdd6d5..b107113 100644 --- a/youtubeviewer/features.py +++ b/youtubeviewer/features.py @@ -21,22 +21,28 @@ OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. """ + from .bypass import * -COMMANDS = [Keys.UP, Keys.DOWN, 'share', 'k', 'j', 'l', 't', 'c'] +COMMANDS = [Keys.UP, Keys.DOWN, "share", "k", "j", "l", "t", "c"] -QUALITY = { - 1: ['144p', "tiny"], - 2: ['240p', "small"], - 3: ['360p', "medium"] -} +QUALITY = {1: ["144p", "tiny"], 2: ["240p", "small"], 3: ["360p", "medium"]} def skip_again(driver): try: - skip_ad = driver.find_element( - By.CLASS_NAME, "ytp-ad-skip-button-container") - driver.execute_script("arguments[0].click();", skip_ad) + skip_selectors = [ + "ytp-ad-skip-button-container", + "ytp-ad-skip-button-text", + "ytp-ad-skip-button", + ] + for selector in skip_selectors: + try: + skip_ad = driver.find_element(By.CLASS_NAME, selector) + driver.execute_script("arguments[0].click();", skip_ad) + return + except WebDriverException: + continue except WebDriverException: pass @@ -44,18 +50,38 @@ def skip_again(driver): def skip_initial_ad(driver, video, duration_dict): video_len = duration_dict.get(video, 0) if video_len > 30: + if "consent.youtube.com" in driver.current_url: + bypass_consent(driver) bypass_popup(driver) try: - skip_ad = WebDriverWait(driver, 15).until(EC.element_to_be_clickable( - (By.CLASS_NAME, "ytp-ad-skip-button-container"))) - + cookie_consent = WebDriverWait(driver, 5).until( + EC.element_to_be_clickable( + ( + By.XPATH, + "//button[contains(text(), 'Accept all') or contains(text(), 'I agree') or contains(text(), 'Accept')]", + ) + ) + ) + cookie_consent.click() + print("Cookie consent accepted.") + except WebDriverException: + pass + try: + skip_ad = WebDriverWait(driver, 15).until( + EC.element_to_be_clickable( + (By.CLASS_NAME, "ytp-ad-skip-button-container") + ) + ) ad_duration = driver.find_element( - By.CLASS_NAME, 'ytp-time-duration').get_attribute('innerText') - ad_duration = sum(x * int(t) - for x, t in zip([60, 1], ad_duration.split(":"))) - ad_duration = ad_duration * uniform(.01, .1) + By.CLASS_NAME, "ytp-time-duration" + ).get_attribute("innerText") + ad_duration = sum( + x * int(t) for x, t in zip([60, 1], ad_duration.split(":")) + ) + ad_duration = ad_duration * uniform(0.01, 0.1) sleep(ad_duration) skip_ad.click() + print("Ad skipped successfully.") except WebDriverException: skip_again(driver) @@ -65,30 +91,47 @@ def save_bandwidth(driver): try: random_quality = QUALITY[quality_index][0] driver.find_element( - By.CSS_SELECTOR, "button.ytp-button.ytp-settings-button").click() - driver.find_element( - By.XPATH, "//div[contains(text(),'Quality')]").click() - - quality = WebDriverWait(driver, 10).until(EC.element_to_be_clickable( - (By.XPATH, f"//span[contains(string(),'{random_quality}')]"))) - driver.execute_script( - "arguments[0].scrollIntoViewIfNeeded();", quality) + By.CSS_SELECTOR, "button.ytp-button.ytp-settings-button" + ).click() + + quality_buttons = [ + "//div[contains(text(),'Quality')]", + "//span[contains(text(),'Quality')]", + "//div[contains(@class, 'ytp-menuitem')]//div[contains(text(),'Quality')]", + ] + + for button in quality_buttons: + try: + driver.find_element(By.XPATH, button).click() + break + except WebDriverException: + continue + + quality = WebDriverWait(driver, 10).until( + EC.element_to_be_clickable( + (By.XPATH, f"//span[contains(string(),'{random_quality}')]") + ) + ) + driver.execute_script("arguments[0].scrollIntoViewIfNeeded();", quality) quality.click() - - except WebDriverException: + print(f"Playback quality set to {random_quality}.") + except WebDriverException as e: + print(f"Failed to set quality using UI: {e}. Falling back to JavaScript.") try: random_quality = QUALITY[quality_index][1] driver.execute_script( - f"document.getElementById('movie_player').setPlaybackQualityRange('{random_quality}')") - except WebDriverException: - pass + f"document.getElementById('movie_player').setPlaybackQualityRange('{random_quality}')" + ) + print(f"Playback quality set to {random_quality} using fallback.") + except WebDriverException as js_error: + print(f"Failed to set playback quality: {js_error}") def change_playback_speed(driver, playback_speed): if playback_speed == 2: - driver.find_element(By.ID, 'movie_player').send_keys('<'*randint(1, 3)) + driver.find_element(By.ID, "movie_player").send_keys("<" * randint(1, 3)) elif playback_speed == 3: - driver.find_element(By.ID, 'movie_player').send_keys('>'*randint(1, 3)) + driver.find_element(By.ID, "movie_player").send_keys(">" * randint(1, 3)) def random_command(driver): @@ -98,30 +141,37 @@ def random_command(driver): option = choices([1, 2], cum_weights=(0.7, 1.00), k=1)[0] if option == 2: command = choice(COMMANDS) - if command in ['m', 't', 'c']: - driver.find_element(By.ID, 'movie_player').send_keys(command) - elif command == 'k': + if command in ["m", "t", "c"]: + driver.find_element(By.ID, "movie_player").send_keys(command) + elif command == "k": if randint(1, 2) == 1: - driver.find_element( - By.ID, 'movie_player').send_keys(command) + driver.find_element(By.ID, "movie_player").send_keys(command) driver.execute_script( - f'document.querySelector("#comments").{choice(["scrollIntoView", "scrollIntoViewIfNeeded"])}();') + f'document.querySelector("#comments").{choice(["scrollIntoView", "scrollIntoViewIfNeeded"])}();' + ) sleep(uniform(4, 10)) driver.execute_script( - 'document.querySelector("#movie_player").scrollIntoViewIfNeeded();') - elif command == 'share': + 'document.querySelector("#movie_player").scrollIntoViewIfNeeded();' + ) + elif command == "share": if choices([1, 2], cum_weights=(0.9, 1.00), k=1)[0] == 2: driver.find_element( - By.XPATH, "//button[@id='button' and @aria-label='Share']").click() + By.XPATH, "//button[@id='button' and @aria-label='Share']" + ).click() sleep(uniform(2, 5)) if randint(1, 2) == 1: - WebDriverWait(driver, 10).until(EC.element_to_be_clickable( - (By.XPATH, "//*[@id='button' and @aria-label='Copy']"))).click() + WebDriverWait(driver, 10).until( + EC.element_to_be_clickable( + (By.XPATH, "//*[@id='button' and @aria-label='Copy']") + ) + ).click() driver.find_element( - By.XPATH, "//*[@id='close-button']/button[@aria-label='Cancel']").click() + By.XPATH, "//*[@id='close-button']/button[@aria-label='Cancel']" + ).click() else: - driver.find_element(By.ID, - 'movie_player').send_keys(command*randint(1, 5)) + driver.find_element(By.ID, "movie_player").send_keys( + command * randint(1, 5) + ) except WebDriverException: pass @@ -137,7 +187,7 @@ def wait_for_new_page(driver, previous_url=False, previous_title=False): break -def play_next_video(driver, suggested): +def play_next_video(driver, suggested, muted): shuffle(suggested) video_id = choice(suggested) @@ -148,92 +198,107 @@ def play_next_video(driver, suggested): break try: - driver.execute_script( - 'document.querySelector("tp-yt-paper-button#expand").click()') - js = f''' - var html = 'https://www.youtube.com/watch?v={video_id}
' - - var element = document.querySelector("#description-inline-expander > yt-formatted-string"); - - element.insertAdjacentHTML( 'afterbegin', html ); - ''' - except WebDriverException: - js = f''' - var html = 'https://www.youtube.com/watch?v={video_id}
' - - var elements = document.querySelectorAll("#description > yt-formatted-string"); - var element = elements[elements.length- 1]; + next_video_url = f"/watch?v={video_id}" + js = f""" + function findAndClickVideo() {{ + let suggestions = document.querySelectorAll('a[href*="{video_id}"]'); + if (suggestions.length > 0) {{ + suggestions[0].click(); + return true; + }} + window.location.href = '{next_video_url}'; + return true; + }} + return findAndClickVideo(); + """ + previous_title = driver.title + driver.execute_script(js) - element.insertAdjacentHTML( 'afterbegin', html ); - ''' + wait_for_new_page( + driver=driver, previous_url=False, previous_title=previous_title + ) - driver.execute_script(js) + WebDriverWait(driver, 10).until( + EC.presence_of_element_located((By.ID, "movie_player")) + ) - find_video = WebDriverWait(driver, 30).until(EC.presence_of_element_located( - (By.XPATH, f'//a[@href="/watch?v={video_id}&t=0s"]'))) - driver.execute_script("arguments[0].scrollIntoViewIfNeeded();", find_video) + if muted: + driver.execute_script("document.getElementById('movie_player').mute();") + print("Player muted.") - previous_title = driver.title - driver.execute_script("arguments[0].click();", find_video) - wait_for_new_page(driver=driver, previous_url=False, - previous_title=previous_title) + return driver.title[:-10] - return driver.title[:-10] + except WebDriverException as e: + print(f"Error playing next video: {str(e)}") + try: + driver.get(f"https://www.youtube.com/watch?v={video_id}") + WebDriverWait(driver, 10).until( + EC.presence_of_element_located((By.ID, "movie_player")) + ) + if muted: + driver.execute_script("document.getElementById('movie_player').mute();") + print("Player muted.") + return driver.title[:-10] + except Exception as e: + raise Exception(f"Failed to play next video: {str(e)}") def play_from_channel(driver, actual_channel): - channel = driver.find_elements( - By.CSS_SELECTOR, 'ytd-video-owner-renderer a')[randint(0, 1)] + channel = driver.find_elements(By.CSS_SELECTOR, "ytd-video-owner-renderer a")[ + randint(0, 1) + ] driver.execute_script("arguments[0].scrollIntoViewIfNeeded();", channel) previous_title = driver.title driver.execute_script("arguments[0].click();", channel) - wait_for_new_page(driver=driver, previous_url=False, - previous_title=previous_title) + wait_for_new_page(driver=driver, previous_url=False, previous_title=previous_title) channel_name = driver.title[:-10] if randint(1, 2) == 1: if channel_name != actual_channel: raise Exception( - f"Accidentally opened another channel : {channel_name}. Closing it...") + f"Accidentally opened another channel : {channel_name}. Closing it..." + ) x = randint(30, 50) sleep(x) output = driver.find_element( - By.XPATH, '//yt-formatted-string[@id="title"]/a').text - log = f'Video [{output}] played for {x} seconds from channel home page : {channel_name}' + By.XPATH, '//yt-formatted-string[@id="title"]/a' + ).text + log = f"Video [{output}] played for {x} seconds from channel home page : {channel_name}" option = 4 else: sleep(randint(2, 5)) previous_url = driver.current_url driver.find_element(By.XPATH, "//tp-yt-paper-tab[2]").click() - wait_for_new_page(driver=driver, previous_url=previous_url, - previous_title=False) + wait_for_new_page( + driver=driver, previous_url=previous_url, previous_title=False + ) driver.refresh() videos = WebDriverWait(driver, 10).until( - EC.presence_of_all_elements_located((By.XPATH, "//a[@id='video-title-link']"))) + EC.presence_of_all_elements_located( + (By.XPATH, "//a[@id='video-title-link']") + ) + ) video = choice(videos) driver.execute_script("arguments[0].scrollIntoViewIfNeeded();", video) sleep(randint(2, 5)) previous_title = driver.title ensure_click(driver, video) - wait_for_new_page(driver=driver, previous_url=False, - previous_title=previous_title) + wait_for_new_page( + driver=driver, previous_url=False, previous_title=previous_title + ) output = driver.title[:-10] - log = f'Random video [{output}] played from channel : {channel_name}' + log = f"Random video [{output}] played from channel : {channel_name}" option = 2 - channel_name = driver.find_element( - By.CSS_SELECTOR, '#upload-info a').text + channel_name = driver.find_element(By.CSS_SELECTOR, "#upload-info a").text if channel_name != actual_channel: raise Exception( - f"Accidentally opened video {output} from another channel : {channel_name}. Closing it...") + f"Accidentally opened video {output} from another channel : {channel_name}. Closing it..." + ) return output, log, option @@ -241,24 +306,29 @@ def play_from_channel(driver, actual_channel): def play_end_screen_video(driver): try: driver.find_element(By.CSS_SELECTOR, '[title^="Pause (k)"]') - driver.find_element(By.ID, 'movie_player').send_keys('k') + driver.find_element(By.ID, "movie_player").send_keys("k") except WebDriverException: pass total = driver.execute_script( - "return document.getElementById('movie_player').getDuration()") + "return document.getElementById('movie_player').getDuration()" + ) driver.execute_script( - f"document.querySelector('#movie_player').seekTo({total}-{randint(2,5)})") - end_screen = WebDriverWait(driver, 5).until(EC.presence_of_all_elements_located( - (By.XPATH, "//*[@class='ytp-ce-covering-overlay']"))) + f"document.querySelector('#movie_player').seekTo({total}-{randint(2,5)})" + ) + end_screen = WebDriverWait(driver, 5).until( + EC.presence_of_all_elements_located( + (By.XPATH, "//*[@class='ytp-ce-covering-overlay']") + ) + ) previous_title = driver.title sleep(randint(2, 5)) if end_screen: ensure_click(driver, choice(end_screen)) else: raise Exception( - f'Unfortunately no end screen video found on this video : {previous_title[:-10]}') - wait_for_new_page(driver=driver, previous_url=False, - previous_title=previous_title) + f"Unfortunately no end screen video found on this video : {previous_title[:-10]}" + ) + wait_for_new_page(driver=driver, previous_url=False, previous_title=previous_title) return driver.title[:-10] diff --git a/youtubeviewer/load_files.py b/youtubeviewer/load_files.py index 4e16da1..11023e5 100644 --- a/youtubeviewer/load_files.py +++ b/youtubeviewer/load_files.py @@ -21,6 +21,7 @@ OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. """ + import hashlib from random import choices @@ -28,31 +29,34 @@ def load_url(): - print(bcolors.WARNING + 'Loading urls...' + bcolors.ENDC) + print(bcolors.WARNING + "Loading urls..." + bcolors.ENDC) - with open('urls.txt', encoding="utf-8") as fh: - links = [x.strip() for x in fh if x.strip() != ''] + with open("urls.txt", encoding="utf-8") as fh: + links = [x.strip() for x in fh if x.strip() != ""] - print(bcolors.OKGREEN + - f'{len(links)} url loaded from urls.txt' + bcolors.ENDC) + print(bcolors.OKGREEN + f"{len(links)} url loaded from urls.txt" + bcolors.ENDC) - links = choices(links, k=len(links)*3) + links + links = choices(links, k=len(links) * 3) + links return links def load_search(): - print(bcolors.WARNING + 'Loading queries...' + bcolors.ENDC) + print(bcolors.WARNING + "Loading queries..." + bcolors.ENDC) + + with open("search.txt", encoding="utf-8") as fh: + search = [ + [y.strip() for y in x.strip().split("::::")] + for x in fh + if x.strip() != "" and "::::" in x + ] - with open('search.txt', encoding="utf-8") as fh: - search = [[y.strip() for y in x.strip().split('::::')] - for x in fh if x.strip() != '' and '::::' in x] + print( + bcolors.OKGREEN + f"{len(search)} query loaded from search.txt" + bcolors.ENDC + ) - print(bcolors.OKGREEN + - f'{len(search)} query loaded from search.txt' + bcolors.ENDC) + search = choices(search, k=len(search) * 3) + search - search = choices(search, k=len(search)*3) + search - return search diff --git a/youtubeviewer/website.py b/youtubeviewer/website.py index 715b41b..6e9320f 100644 --- a/youtubeviewer/website.py +++ b/youtubeviewer/website.py @@ -21,6 +21,7 @@ OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. """ + import calendar import sqlite3 import warnings @@ -28,134 +29,170 @@ from datetime import date, datetime, timedelta from flask import Flask, jsonify, render_template, request +from werkzeug.serving import run_simple warnings.filterwarnings("ignore", category=Warning) -MONTHS = ['January', 'February', 'March', 'April', 'May', 'June', - 'July', 'August', 'September', 'October', 'November', 'December'] +MONTHS = [ + "January", + "February", + "March", + "April", + "May", + "June", + "July", + "August", + "September", + "October", + "November", + "December", +] console = [] -summary_table = '' -html_table = '' -database = 'database.db' +summary_table = "" +html_table = "" +database = "database.db" def create_graph_data(dropdown_text): + """Generate graph data based on dropdown selection.""" now = datetime.now() - day = now.day - month = now.month - year = now.year today = now.date() - days = False - try: - number = [int(s) for s in dropdown_text.split() if s.isdigit()][0] - except Exception: - number = False - - if number: - if dropdown_text.startswith('Last'): - days = number - else: - month = MONTHS.index(dropdown_text[:-5]) + 1 - year = number - else: - month = MONTHS.index(dropdown_text) + 1 - query_days = [] + try: + days = ( + int([int(s) for s in dropdown_text.split() if s.isdigit()][0]) + if "Last" in dropdown_text + else None + ) + except (ValueError, IndexError): + days = None + if days: - for i in range(days): - day = today - timedelta(days=i) - query_days.append(str(day)) + query_days = [(today - timedelta(days=i)).isoformat() for i in range(days)] query_days.reverse() - else: - num_days = calendar.monthrange(year, month)[1] - for day in range(1, num_days+1): - query_days.append(str(date(year, month, day))) - - first_date = query_days[0] - last_date = query_days[-1] - - graph_data = [['Date', 'Views']] - total = 0 + try: + if " " in dropdown_text: + month, year = dropdown_text.rsplit(" ", 1) + month = MONTHS.index(month) + 1 + year = int(year) + else: + month = MONTHS.index(dropdown_text) + 1 + year = now.year + num_days = calendar.monthrange(year, month)[1] + query_days = [ + date(year, month, day).isoformat() for day in range(1, num_days + 1) + ] + except Exception as e: + print(f"Error parsing dropdown text '{dropdown_text}': {e}") + return [], 0, None, None + + graph_data = [["Date", "Views"]] + total_views = 0 try: with closing(sqlite3.connect(database, timeout=30)) as connection: with closing(connection.cursor()) as cursor: - for i in query_days: - view = cursor.execute( - "SELECT view FROM statistics WHERE date = ?", (i,),).fetchall() - if view: - graph_data.append([i[-2:], view[0][0]]) - total += view[0][0] - else: - graph_data.append([i[-2:], 0]) - except Exception: - pass + for query_date in query_days: + result = cursor.execute( + "SELECT view FROM statistics WHERE date = ?", (query_date,) + ).fetchone() + views = result[0] if result else 0 + graph_data.append([query_date[-2:], views]) + total_views += views + except sqlite3.Error as e: + print(f"Database error: {e}") - return graph_data, total, first_date, last_date + return graph_data, total_views, query_days[0], query_days[-1] def create_dropdown_data(): - dropdown = ['Last 7 days', 'Last 28 days', 'Last 90 days'] + """Create dropdown data for the UI.""" + dropdown = ["Last 7 days", "Last 28 days", "Last 90 days"] now = datetime.now() current_year = now.year + dropdown.append(now.strftime("%B")) - for _ in range(0, 12): + for _ in range(12): now = now.replace(day=1) - timedelta(days=1) - if current_year == now.year: - dropdown.append(now.strftime("%B")) - else: - dropdown.append(now.strftime("%B %Y")) + dropdown.append( + now.strftime("%B %Y") if now.year < current_year else now.strftime("%B") + ) return dropdown def shutdown_server(): - func = request.environ.get('werkzeug.server.shutdown') - if func is None: - raise RuntimeError('Not running with the Werkzeug Server') - func() + """Shut down the Flask server.""" + try: + func = request.environ.get("werkzeug.server.shutdown") + if func is None: + # For newer versions of Werkzeug + raise RuntimeError("Not running with Werkzeug Server") + func() + except Exception as e: + print(f"Error shutting down server: {e}") def start_server(host, port, debug=False): - app = Flask(__name__, - static_url_path='', - static_folder='web/static', - template_folder='web/templates') - - @app.route('/') + """Start the Flask server.""" + app = Flask( + __name__, + static_url_path="", + static_folder="web/static", + template_folder="web/templates", + ) + + @app.route("/") def home(): dropdown = create_dropdown_data() - return render_template('homepage.html', dropdownitems=dropdown) + return render_template("homepage.html", dropdownitems=dropdown) - @app.route('/update', methods=['POST']) + @app.route("/update", methods=["POST"]) def update(): - return jsonify({'result': 'success', 'console': console[:200], 'summary': summary_table[8:-9], 'table': html_table[8:-9]}) - - @app.route('/graph', methods=['GET', 'POST']) + return jsonify( + { + "result": "success", + "console": console[:200], + "summary": summary_table[8:-9], + "table": html_table[8:-9], + } + ) + + @app.route("/graph", methods=["GET", "POST"]) def graph(): - query = None - if request.method == 'POST': - query = request.json['query'] - graph_data, total, first_date, last_date = create_graph_data(query) - - return jsonify({ - 'graph_data': graph_data, - 'total': total, - 'first': first_date, - 'last': last_date - }) - - @app.route('/shutdown', methods=['POST']) + if request.method == "POST": + try: + query = request.json.get("query") + graph_data, total, first_date, last_date = create_graph_data(query) + return jsonify( + { + "graph_data": graph_data, + "total": total, + "first": first_date, + "last": last_date, + } + ) + except Exception as e: + return jsonify({"error": f"Failed to generate graph: {e}"}), 500 + return jsonify({"error": "Invalid request method"}), 405 + + @app.route("/shutdown", methods=["POST"]) def shutdown(): - shutdown_server() - return 'Server shutting down...' - - app.run(host=host, port=port, debug=debug) + try: + shutdown_server() + return "Server shutting down..." + except RuntimeError as e: + return jsonify({"error": str(e)}), 500 + + if debug: + app.run(host=host, port=port, debug=debug) + else: + run_simple(host, port, app, threaded=True) -if __name__ == '__main__': - start_server(host='0.0.0.0', port=5000, debug=True) +if __name__ == "__main__": + start_server(host="0.0.0.0", port=5000, debug=True)