• XSS.stack #1 – первый литературный журнал от юзеров форума

чекер сайтов на наличие вордпресс

PPoe

RAID-массив
Пользователь
Регистрация
27.10.2024
Сообщения
53
Реакции
40
Привет XXS! Сегодня мы начнем проверять большой список доменов на наличие вордпресс.
(Статья будет максимально короткая и минимально информативная)
На днях я столкнулся с нуждой парсить сайты и найти все на wordpress, пришлось писать небольшую программку, может ее кто доработает и скинет мне код с интересной cve или 0day)
тааак, что то я затянул!
сама программа:
Python:
import requests
import argparse
from urllib.parse import urlparse
from concurrent.futures import ThreadPoolExecutor
import socket
import struct
import threading
import os
import time


write_lock = threading.Lock()

def check_site(url, output_file):
    try:
        
        if not url.startswith(('http://', 'https://')):
            test_url = 'http://' + url
        else:
            test_url = url
            
        parsed = urlparse(test_url)
        base_url = f"{parsed.scheme}://{parsed.netloc}"
        
       
        wp_urls = [
            f"{base_url}/wp-login.php",
            f"{base_url}/wp-content/",
            f"{base_url}/wp-json/",
            f"{base_url}/readme.html"
        ]
        
        is_wordpress = False
        for wp_url in wp_urls:
            try:
                response = requests.get(wp_url, timeout=10, allow_redirects=True)
                if response.status_code == 200:
                  
                    if "wp-content" in response.text or "WordPress" in response.text:
                        is_wordpress = True
                        break
            except:
                continue
        
        if not is_wordpress:
            print(f"[-] Not WordPress: {base_url}")
            return
        
      
        cloudflare_indicators = [
            "cloudflare", "cf-ray", "__cfduid",
            "__cf_bm", "cf-cache-status", "cloudflare-nginx"
        ]
        
        try:
            response = requests.get(base_url, timeout=10)
            headers = str(response.headers).lower()
            using_cloudflare = any(cf in headers for cf in cloudflare_indicators)
            
           
            if not using_cloudflare:
                try:
                    ip = socket.gethostbyname(parsed.netloc)
                    
                    if is_cloudflare_ip(ip):
                        using_cloudflare = True
                except:
                    pass
        except Exception as e:
            print(f"[!] Error checking Cloudflare for {base_url}: {str(e)}")
            return
        
        if using_cloudflare:
            print(f"[-] Using Cloudflare: {base_url}")
            return
        
       
        print(f"[+] Found WordPress without Cloudflare: {base_url}")
        
        
        with write_lock:
            try:
               
                with open(output_file, 'a') as f:
                    f.write(f"{base_url}\n")
                    
                    f.flush()
                    
                    os.fsync(f.fileno())
                print(f"[✓] Saved to file: {base_url}")
            except Exception as e:
                print(f"[!] Failed to save {base_url}: {str(e)}")
        
    except Exception as e:
        print(f"[!] Error processing {url}: {str(e)}")

def is_cloudflare_ip(ip):
    """Проверяет принадлежит ли IP к Cloudflare"""
    try:
        
        ip_num = struct.unpack("!I", socket.inet_aton(ip))[0]
        
        
        cloudflare_ranges = [
            ("173.245.48.0", "173.245.63.255"),
            ("103.21.244.0", "103.21.247.255"),
            ("103.22.200.0", "103.22.203.255"),
            ("103.31.4.0", "103.31.7.255"),
            ("141.101.64.0", "141.101.127.255"),
            ("108.162.192.0", "108.162.255.255"),
            ("190.93.240.0", "190.93.255.255"),
            ("188.114.96.0", "188.114.111.255"),
            ("197.234.240.0", "197.234.243.255"),
            ("198.41.128.0", "198.41.255.255"),
            ("162.158.0.0", "162.159.255.255"),
            ("104.16.0.0", "104.31.255.255")
        ]
        
        for start, end in cloudflare_ranges:
            start_num = struct.unpack("!I", socket.inet_aton(start))[0]
            end_num = struct.unpack("!I", socket.inet_aton(end))[0]
            if start_num <= ip_num <= end_num:
                return True
                
    except:
        pass
        
    return False

def main():
    parser = argparse.ArgumentParser(description='Find WordPress sites without Cloudflare')
    parser.add_argument('-i', '--input', required=True, help='Input file with URLs')
    parser.add_argument('-o', '--output', default='wordpress_no_cloudflare.txt', help='Output file')
    parser.add_argument('-t', '--threads', type=int, default=10, help='Number of threads')
    
    args = parser.parse_args()
    
   
    with open(args.output, 'w') as f:
        f.write("")
    
    print(f"Starting scan of {args.input} with {args.threads} threads")
    print(f"Results will be saved in real-time to: {args.output}\n")
    
    with open(args.input, 'r') as f:
        urls = [line.strip() for line in f.readlines() if line.strip()]
    
    print(f"Loaded {len(urls)} URLs to check\n")
    
    
    with ThreadPoolExecutor(max_workers=args.threads) as executor:
        
        futures = []
        for url in urls:
            future = executor.submit(check_site, url, args.output)
            futures.append(future)
        
        
        for future in futures:
            try:
                future.result()
            except Exception as e:
                print(f"[!] Thread error: {str(e)}")
    
    print("\n[+] Scan completed! All results saved.")

if __name__ == "__main__":
    main()
Она ищет сайты из списка доменов с вордпресом и без cloudflare
собственно на этом все, надеюсь мой кривой говнокод будет хоть комуто полезен
 
я сооброжаю мало , но для забавы можно было бы сделать парсинг по диапозону ип адресов и их доменов с последующим чекером на наличие wp (многопоточность привествуется) ну и поставить это дело на тачку с сохранением логов) и заходить через пару дней
 
Ммм, вайбкодинг
1750362837450.jpeg
 
Пожалуйста, обратите внимание, что пользователь заблокирован
Дополнил :zns3:
Покажет версию, работает асинхронно и базово укажет на возможные уязвимости. Надо заполнить признаки уязвимостей будет в будущем

Python:
import asyncio
import aiohttp
import argparse
from urllib.parse import urlparse
import socket
import struct
import threading
import os
import progressbar
import re
from aiohttp import ClientSession
import random


write_lock = threading.Lock()

CLOUDFLARE_RANGES = [
    ("173.245.48.0", "173.245.63.255"),
    ("103.21.244.0", "103.21.247.255"),
    ("103.22.200.0", "103.22.203.255"),
    ("103.31.4.0", "103.31.7.255"),
    ("141.101.64.0", "141.101.127.255"),
    ("108.162.192.0", "108.162.255.255"),
    ("190.93.240.0", "190.93.255.255"),
    ("188.114.96.0", "188.114.111.255"),
    ("197.234.240.0", "197.234.243.255"),
    ("198.41.128.0", "198.41.255.255"),
    ("162.158.0.0", "162.159.255.255"),
    ("104.16.0.0", "104.31.255.255"),
]

FALLBACK_USER_AGENTS = [
    "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36",
    "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/14.1.1 Safari/605.1.15",
    "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:89.0) Gecko/20100101 Firefox/89.0",
    "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.114 Safari/537.36",
    "Mozilla/5.0 (iPhone; CPU iPhone OS 14_6 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/14.1.1 Mobile/15E148 Safari/604.1"
]

def ip_to_int(ip):
    return struct.unpack("!I", socket.inet_aton(ip))[0]

def is_cloudflare_ip(ip):
    try:
        ip_num = ip_to_int(ip)
        for start, end in CLOUDFLARE_RANGES:
            start_num = ip_to_int(start)
            end_num = ip_to_int(end)
            if start_num <= ip_num <= end_num:
                return True
    except:
        pass
    return False

try:
    from fake_useragent import UserAgent
except ImportError:
    class UserAgent:
        def random(self):
            raise ImportError("fake_useragent module not installed. Please install it using 'pip install fake-useragent'.")

def get_random_user_agent():
    try:
        ua = UserAgent()
        return ua.random
    except:
        return random.choice(FALLBACK_USER_AGENTS)

async def fetch(session, url, timeout=10):
    user_agent = get_random_user_agent()
    headers = {"User-Agent": user_agent}
    try:
        async with session.get(url, headers=headers, timeout=aiohttp.ClientTimeout(total=timeout)) as response:
            text = await response.text()
            return {"status": response.status, "text": text, "headers": dict(response.headers)}
    except:
        return None

async def check_wordpress(session, base_url):
    wp_indicators = [
        "/wp-login.php", "/wp-content/", "/wp-json/", "/readme.html", "/xmlrpc.php"
    ]
    for indicator in wp_indicators:
        url = base_url + indicator
        response = await fetch(session, url)
        if response and response["status"] == 200:
            text = response["text"].lower()
            if any(keyword in text for keyword in ["wp-", "wordpress"]):
                return True
    return False

async def get_wp_version(session, base_url):
    version_patterns = [
        r"wordpress\s+([\d.]+)", r"ver=([\d.]+)", r"generator\" content=\"wordpress ([\d.]+)\""
    ]
    response = await fetch(session, base_url)
    if response and response["status"] == 200:
        text = response["text"].lower()
        for pattern in version_patterns:
            match = re.search(pattern, text)
            if match:
                return match.group(1)
    return "Unknown"

async def check_vulnerabilities(session, base_url):
    vuln_indicators = {
        "Old Version": ["ver=4.", "ver=3."],
        "Debug Mode": ["wp-config.php", "debug.log"],
        "Exposed Admin": ["/wp-admin/"],
    }
    vulns = []
    for key, indicators in vuln_indicators.items():
        for indicator in indicators:
            response = await fetch(session, base_url + indicator)
            if response and response["status"] == 200:
                if any(ind in response["text"].lower() for ind in indicators):
                    vulns.append(key)
    return vulns if vulns else ["None detected"]

async def check_cloudflare(session, base_url, parsed):
    cloudflare_indicators = [
        "cloudflare", "cf-ray", "__cfduid", "__cf_bm", "cf-cache-status", "cloudflare-nginx"
    ]
    response = await fetch(session, base_url)
    if response:
        headers = str(response["headers"]).lower()
        if any(cf in headers for cf in cloudflare_indicators):
            return True
        ip = await asyncio.get_event_loop().run_in_executor(None, socket.gethostbyname, parsed.netloc)
        if is_cloudflare_ip(ip):
            return True
    return False

async def process_url(url, output_file, progress, semaphore):
    async with semaphore:
        if not url.startswith(('http://', 'https://')):
            test_url = 'http://' + url
        else:
            test_url = url
        parsed = urlparse(test_url)
        base_url = f"{parsed.scheme}://{parsed.netloc}"

        async with ClientSession() as session:
            try:
                is_wp = await check_wordpress(session, base_url)
                if not is_wp:
                    print(f"[-] Not WordPress: {base_url}")
                    progress.update(1)
                    return

                uses_cf = await check_cloudflare(session, base_url, parsed)
                if uses_cf:
                    print(f"[-] Using Cloudflare: {base_url}")
                    progress.update(1)
                    return

                version = await get_wp_version(session, base_url)
                vulns = await check_vulnerabilities(session, base_url)

                result = f"[+] WordPress without Cloudflare: {base_url} (Version: {version}, Vulns: {', '.join(vulns)})"
                print(result)
                with write_lock:
                    with open(output_file, 'a') as f:
                        f.write(f"{base_url} | {version} | {', '.join(vulns)}\n")
                        f.flush()
                        os.fsync(f.fileno())
            except Exception as e:
                print(f"[!] Error processing {url}: {str(e)}")
            finally:
                progress.update(1)

async def main():
    parser = argparse.ArgumentParser(description='Advanced WordPress Scanner with Random User-Agents')
    parser.add_argument('-i', '--input', required=True, help='Input file with URLs')
    parser.add_argument('-o', '--output', default='wordpress_scan_results.txt', help='Output file')
    parser.add_argument('-t', '--threads', type=int, default=10, help='Number of concurrent tasks')
   
    args = parser.parse_args()
   
    with open(args.output, 'a') as f:
        f.write("URL | Version | Vulnerabilities\n")
   
    print(f"Starting scan of {args.input} with {args.threads} concurrent tasks")
    print(f"Results will be saved in real-time to: {args.output}\n")
   
    with open(args.input, 'r') as f:
        urls = [line.strip() for line in f.readlines() if line.strip()]
   
    print(f"Loaded {len(urls)} URLs to check\n")
   
    progress = progressbar.ProgressBar(maxval=len(urls))
    progress.start()
    semaphore = asyncio.Semaphore(args.threads)
   
    tasks = [process_url(url, args.output, progress, semaphore) for url in urls]
    await asyncio.gather(*tasks)
   
    progress.finish()
    print("\n[+] Scan completed! All results saved.")

if __name__ == "__main__":
    asyncio.run(main())
 
Дополнил :zns3:
Покажет версию, работает асинхронно и базово укажет на возможные уязвимости. Надо заполнить признаки уязвимостей будет в будущем

Python:
import asyncio
import aiohttp
import argparse
from urllib.parse import urlparse
import socket
import struct
import threading
import os
import progressbar
import re
from aiohttp import ClientSession
import random


write_lock = threading.Lock()

CLOUDFLARE_RANGES = [
    ("173.245.48.0", "173.245.63.255"),
    ("103.21.244.0", "103.21.247.255"),
    ("103.22.200.0", "103.22.203.255"),
    ("103.31.4.0", "103.31.7.255"),
    ("141.101.64.0", "141.101.127.255"),
    ("108.162.192.0", "108.162.255.255"),
    ("190.93.240.0", "190.93.255.255"),
    ("188.114.96.0", "188.114.111.255"),
    ("197.234.240.0", "197.234.243.255"),
    ("198.41.128.0", "198.41.255.255"),
    ("162.158.0.0", "162.159.255.255"),
    ("104.16.0.0", "104.31.255.255"),
]

FALLBACK_USER_AGENTS = [
    "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36",
    "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/14.1.1 Safari/605.1.15",
    "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:89.0) Gecko/20100101 Firefox/89.0",
    "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.114 Safari/537.36",
    "Mozilla/5.0 (iPhone; CPU iPhone OS 14_6 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/14.1.1 Mobile/15E148 Safari/604.1"
]

def ip_to_int(ip):
    return struct.unpack("!I", socket.inet_aton(ip))[0]

def is_cloudflare_ip(ip):
    try:
        ip_num = ip_to_int(ip)
        for start, end in CLOUDFLARE_RANGES:
            start_num = ip_to_int(start)
            end_num = ip_to_int(end)
            if start_num <= ip_num <= end_num:
                return True
    except:
        pass
    return False

try:
    from fake_useragent import UserAgent
except ImportError:
    class UserAgent:
        def random(self):
            raise ImportError("fake_useragent module not installed. Please install it using 'pip install fake-useragent'.")

def get_random_user_agent():
    try:
        ua = UserAgent()
        return ua.random
    except:
        return random.choice(FALLBACK_USER_AGENTS)

async def fetch(session, url, timeout=10):
    user_agent = get_random_user_agent()
    headers = {"User-Agent": user_agent}
    try:
        async with session.get(url, headers=headers, timeout=aiohttp.ClientTimeout(total=timeout)) as response:
            text = await response.text()
            return {"status": response.status, "text": text, "headers": dict(response.headers)}
    except:
        return None

async def check_wordpress(session, base_url):
    wp_indicators = [
        "/wp-login.php", "/wp-content/", "/wp-json/", "/readme.html", "/xmlrpc.php"
    ]
    for indicator in wp_indicators:
        url = base_url + indicator
        response = await fetch(session, url)
        if response and response["status"] == 200:
            text = response["text"].lower()
            if any(keyword in text for keyword in ["wp-", "wordpress"]):
                return True
    return False

async def get_wp_version(session, base_url):
    version_patterns = [
        r"wordpress\s+([\d.]+)", r"ver=([\d.]+)", r"generator\" content=\"wordpress ([\d.]+)\""
    ]
    response = await fetch(session, base_url)
    if response and response["status"] == 200:
        text = response["text"].lower()
        for pattern in version_patterns:
            match = re.search(pattern, text)
            if match:
                return match.group(1)
    return "Unknown"

async def check_vulnerabilities(session, base_url):
    vuln_indicators = {
        "Old Version": ["ver=4.", "ver=3."],
        "Debug Mode": ["wp-config.php", "debug.log"],
        "Exposed Admin": ["/wp-admin/"],
    }
    vulns = []
    for key, indicators in vuln_indicators.items():
        for indicator in indicators:
            response = await fetch(session, base_url + indicator)
            if response and response["status"] == 200:
                if any(ind in response["text"].lower() for ind in indicators):
                    vulns.append(key)
    return vulns if vulns else ["None detected"]

async def check_cloudflare(session, base_url, parsed):
    cloudflare_indicators = [
        "cloudflare", "cf-ray", "__cfduid", "__cf_bm", "cf-cache-status", "cloudflare-nginx"
    ]
    response = await fetch(session, base_url)
    if response:
        headers = str(response["headers"]).lower()
        if any(cf in headers for cf in cloudflare_indicators):
            return True
        ip = await asyncio.get_event_loop().run_in_executor(None, socket.gethostbyname, parsed.netloc)
        if is_cloudflare_ip(ip):
            return True
    return False

async def process_url(url, output_file, progress, semaphore):
    async with semaphore:
        if not url.startswith(('http://', 'https://')):
            test_url = 'http://' + url
        else:
            test_url = url
        parsed = urlparse(test_url)
        base_url = f"{parsed.scheme}://{parsed.netloc}"

        async with ClientSession() as session:
            try:
                is_wp = await check_wordpress(session, base_url)
                if not is_wp:
                    print(f"[-] Not WordPress: {base_url}")
                    progress.update(1)
                    return

                uses_cf = await check_cloudflare(session, base_url, parsed)
                if uses_cf:
                    print(f"[-] Using Cloudflare: {base_url}")
                    progress.update(1)
                    return

                version = await get_wp_version(session, base_url)
                vulns = await check_vulnerabilities(session, base_url)

                result = f"[+] WordPress without Cloudflare: {base_url} (Version: {version}, Vulns: {', '.join(vulns)})"
                print(result)
                with write_lock:
                    with open(output_file, 'a') as f:
                        f.write(f"{base_url} | {version} | {', '.join(vulns)}\n")
                        f.flush()
                        os.fsync(f.fileno())
            except Exception as e:
                print(f"[!] Error processing {url}: {str(e)}")
            finally:
                progress.update(1)

async def main():
    parser = argparse.ArgumentParser(description='Advanced WordPress Scanner with Random User-Agents')
    parser.add_argument('-i', '--input', required=True, help='Input file with URLs')
    parser.add_argument('-o', '--output', default='wordpress_scan_results.txt', help='Output file')
    parser.add_argument('-t', '--threads', type=int, default=10, help='Number of concurrent tasks')
  
    args = parser.parse_args()
  
    with open(args.output, 'a') as f:
        f.write("URL | Version | Vulnerabilities\n")
  
    print(f"Starting scan of {args.input} with {args.threads} concurrent tasks")
    print(f"Results will be saved in real-time to: {args.output}\n")
  
    with open(args.input, 'r') as f:
        urls = [line.strip() for line in f.readlines() if line.strip()]
  
    print(f"Loaded {len(urls)} URLs to check\n")
  
    progress = progressbar.ProgressBar(maxval=len(urls))
    progress.start()
    semaphore = asyncio.Semaphore(args.threads)
  
    tasks = [process_url(url, args.output, progress, semaphore) for url in urls]
    await asyncio.gather(*tasks)
  
    progress.finish()
    print("\n[+] Scan completed! All results saved.")

if __name__ == "__main__":
    asyncio.run(main())
ай лев :smile10:
 


Напишите ответ...
  • Вставить:
Прикрепить файлы
Верх