• XSS.stack #1 – первый литературный журнал от юзеров форума

модель нормального ПО

salsa20

(L2) cache
Пользователь
Регистрация
03.05.2019
Сообщения
498
Реакции
110
Гарант сделки
1
Вижу в этом плюс для тематики чтобы сделать модельку норм + плохую модельку из образцов малвари. Думаю полезная инфа.
Тут был на форуме тип который обучал модель но потёр посты.

Метод RELIEF для выбора наиболее важных признаков работает следующим образом:

1. Для каждого признака (например, n-граммы опкодов) задается вес равный 0.

2. Случайным образом выбирается опорный вектор x из обучающей выборки.

3. Находятся два его ближайших соседа:
- ближайший попадание (nearest hit) xh из того же класса
- ближайший промах (nearest miss) xm из другого класса

4. Обновляются веса для каждого признака по формуле:
wi = wi + (xi - xmi)2 - (xi - xhi)2

5. Повторяются шаги 2-4 много раз для разных случайно выбранных опорных векторов.

6. Выбираются nsf признаков с наибольшим весом - это наиболее важные признаки.

Построение модели нормального ПО на основе этих признаков осуществляется с помощью итеративного алгоритма кластеризации на базе машин опорных векторов, описанного в статье. Суть в том, чтобы разбить все векторы признаков незараженных файлов на несколько кластеров, соответствующих разным "нормальным" моделям поведения.

Затем для каждого кластера строится описание границы нормальных объектов с помощью машин опорных векторов или описаний опорных векторных данных. Это и есть итоговая модель нормального ПО.

Обобщенная инфа с pdf

В соответствии с инструкциями я проанализирую эту научную статью, чтобы определить ее основную идею, методологию, результаты и выводы.

Основная идея статьи заключается в использовании аномального подхода к обнаружению вредоносного ПО, основанного на анализе последовательностей опкодов. Авторы применяют n-граммные модели для извлечения важных признаков из этих последовательностей, а затем используют итеративный алгоритм группировки на основе машин опорных векторов в сочетании с описаниями опорных векторных данных для построения модели нормального поведения программного обеспечения.

В качестве методологии авторы извлекают последовательности опкодов из исполняемых файлов, применяют к ним n-граммные модели для получения векторов признаков, используют метод RELIEF для выбора наиболее важных признаков и строят модель нормального ПО с помощью итеративной группировки на основе машин опорных векторов и описаний опорных векторных данных.

Основными результатами являются построенная модель нормального поведения ПО и показанная эффективность предложенного метода обнаружения неизвестных ранее вредоносных программ. По сравнению с существующими аналогами предложенный алгоритм продемонстрировал более высокую точность классификации (97,09%).

К сожалению, в статье не указаны конкретные ошибки или неточности. Авторы лишь отмечают, что алгоритм требует значительных вычислительных ресурсов на этапе обучения при большом объеме обучающей выборки.

В целом, эта статья предлагает эффективный метод аномального обнаружения вредоносных программ на основе анализа последовательностей опкодов с помощью опорных векторов. Полученные результаты позволяют сделать вывод о применимости данного подхода для построения систем обнаружения неизвестных ранее угроз. Дальнейшие исследования могут быть направлены на оптимизацию алгоритма и включение в анализ параметров опкодов.

Таков мой анализ основной идеи, методологии, результатов и выводов данной научной статьи. Я готов ответить на любые дополнительные вопросы по ее содержанию.
 

Вложения

  • zolotukhin2014.pdf
    148.4 КБ · Просмотры: 21
Добавлю по теме
Увлекательно кодить инструмент для анализа, вот с++ сорсик который можно скомпилировать под DynamoRIO
Щас умеет искать ngrams для файла


Всё это мудрье нужно для анализа системных бинарников на легит, чтобы вы могли тренировать модельку свою. Либо в других целях.


Логируются также опкоды
opcode_stats.txt
BB has 12 instructions:
opcode 10 = 1 instructions (8.3%)
opcode 12 = 1 instructions (8.3%)
opcode 18 = 1 instructions (8.3%)
opcode 42 = 1 instructions (8.3%)
opcode 55 = 2 instructions (16.7%)
opcode 56 = 4 instructions (33.3%)
opcode 61 = 2 instructions (16.7%)
BB has 12 instructions:
opcode 10 = 1 instructions (8.3%)
opcode 18 = 3 instructions (25.0%)
opcode 55 = 4 instructions (33.3%)
opcode 56 = 2 instructions (16.7%)
opcode 60 = 1 instructions (8.3%)
opcode 156 = 1 instructions (8.3%)
BB has 6 instructions:
opcode 8 = 1 instructions (16.7%)
opcode 12 = 1 instructions (16.7%)
opcode 42 = 1 instructions (16.7%)
opcode 55 = 1 instructions (16.7%)
opcode 57 = 1 instructions (16.7%)
opcode 61 = 1 instructions (16.7%)
BB has 5 instructions:
opcode 12 = 1 instructions (20.0%)
opcode 18 = 1 instructions (20.0%)
opcode 55 = 1 instructions (20.0%)
opcode 60 = 1 instructions (20.0%)
opcode 156 = 1 instructions (20.0%)

Запуск
C:\DynamoRIO\dynamorio\build\bin64\drrun.exe -c C:\DynamoRIO\dynamorio\build\x64\Release\dr_opcode_stats.dll -- C:\app64.exe

C++:
#define WINDOWS
#define X86_64
#define _CRT_SECURE_NO_WARNINGS
#include <stdio.h>
#include <stdlib.h>
#include "dr_api.h"
#include <fstream>

#include <unordered_map>
#include <unordered_set>

#define NGRAM_N 3 // размер n-грамм
#define MAX_OPCODES 256

int *opcodes_ngrams = NULL;
int ngram_count = 0;
int opcodes_ngrams_size = 0;
int *opcode_stats = NULL;
int *ops = NULL;
std::unordered_set<int> unique_ngrams;

int
get_instrlist_length(instrlist_t *bb)
{
    int len = 0;
    for (instr_t *instr = instrlist_first(bb); instr != NULL;
         instr = instr_get_next(instr)) {

        len++;
    }
    return len;
}

void insert_ngram(const int *ngram, int n) {
    bool is_new_ngram = false;

    for (int i = 0; i < n; i++) {
        if (unique_ngrams.find(ngram[i]) == unique_ngrams.end()) {
            // Эта н-грамма еще не была добавлена, добавляем ее в уникальное множество
            unique_ngrams.insert(ngram[i]);
            is_new_ngram = true;
        }
    }

    if (is_new_ngram) {
        // Записываем только новые н-граммы в файл
        FILE *file = fopen("C:\\opcode_ngrams.log", "a");

        if (file) {
            fprintf(file, "Found ngrams: ");
            for (int i = 0; i < n; i++) {
                fprintf(file, "%d ", ngram[i]);
            }
            fprintf(file, "\n");
            fclose(file);
        }
    } else {
        // Если нет новых н-грамм, то можно не записывать "Found ngrams: " в лог
    }
}



void
log_ngrams()
{
  
}

static dr_emit_flags_t
code_block_event(void *drcontext, void *tag, instrlist_t *bb, bool for_trace,
                 bool translating)
{

    //int opcode_stats[256] = { 0 }; // Assuming a maximum of 256 opcodes
    //int ops[256];                  // Assuming a maximum of 256 opcodes
    int num_instrs = 0;

    instr_t *instr;

      int num_opcodes = get_instrlist_length(bb);

    if (num_opcodes <= MAX_OPCODES) {

          // используем предвыделенный буфер
          opcode_stats = (int *)dr_raw_mem_alloc(
              MAX_OPCODES * sizeof(int), DR_MEMPROT_READ | DR_MEMPROT_WRITE, NULL);
          ops = (int *)dr_raw_mem_alloc(MAX_OPCODES * sizeof(int),
                                        DR_MEMPROT_READ | DR_MEMPROT_WRITE, NULL);
    } else {

        dr_printf("WARNING: block exceeds max size, allocating dynamically");

        opcode_stats = (int *)dr_raw_mem_alloc(num_opcodes * sizeof(int),
                                               DR_MEMPROT_READ | DR_MEMPROT_WRITE, NULL);
        ops = (int *)dr_raw_mem_alloc(num_opcodes * sizeof(int),
                                      DR_MEMPROT_READ | DR_MEMPROT_WRITE, NULL);
    }

    for (instr = instrlist_first(bb); instr; instr = instr_get_next(instr)) {
        int opcode = instr_get_opcode(instr);
        opcode_stats[opcode]++;
        ops[num_instrs] = opcode;
        num_instrs++;
    }

    // Calculate n-grams
    for (int i = 0; i < num_instrs - NGRAM_N + 1; i++) {

        int ngram[NGRAM_N];

        for (int j = 0; j < NGRAM_N; j++) {
            ngram[j] = ops[i + j];
        }

        insert_ngram(ngram, NGRAM_N);
    }
    // Log n-grams
    log_ngrams();
      

    // Открываем файл для дописывания
    FILE *file = fopen("C:\\opcode_stats.txt", "a");

    // Проверяем, успешно ли открыт файл
    if (file) {
        // Записываем результаты в файл
        fprintf(file, "BB has %d instructions:\n", num_instrs);
        for (int i = 0; i < 256; i++) {
            if (opcode_stats[i] > 0) {
                float percent = 100.0f * opcode_stats[i] / num_instrs;
                fprintf(file, "opcode %2d = %6d instructions (%2.1f%%)\n", i,
                        opcode_stats[i], percent);
            }
        }

        // Закрываем файл
        fclose(file);
        printf("Results appended to opcode_stats.txt\n");
    } else {
        printf("Failed to open opcode_stats.txt for appending.\n");
    }
    
    // освобождаем выделенную память
    dr_raw_mem_free(opcode_stats, num_opcodes * sizeof(int));
    dr_raw_mem_free(ops, num_opcodes * sizeof(int));

    return DR_EMIT_DEFAULT;
}
DR_EXPORT void
dr_client_exit(void)
{
    printf("dr_client_exit started.\n");

    // Освобождаем память, выделенную для opcodes_ngrams
    if (opcodes_ngrams) {
        free(opcodes_ngrams);
    }
}
DR_EXPORT void
dr_client_main(client_id_t id, int argc, const char *argv[])
{
    printf("dr_client_main started.\n");

    dr_register_bb_event(code_block_event);
}

opcode_ngrams.txt
Found ngrams: 66 18 18
Found ngrams: 18 18 10
Found ngrams: 18 10 61
Found ngrams: 10 61 12
Found ngrams: 12 61 43
Found ngrams: 55 56 56
Found ngrams: 61 12 42
Found ngrams: 55 55 60
Found ngrams: 55 60 156
Found ngrams: 8 61 12
Found ngrams: 61 12 57
Found ngrams: 55 56 195
Found ngrams: 56 195 258
Found ngrams: 195 258 14
Found ngrams: 258 14 159
Found ngrams: 56 55 101
Found ngrams: 195 61 17
Found ngrams: 17 14 28
Found ngrams: 25 4 60
Found ngrams: 4 60 31
Found ngrams: 56 12 20
Found ngrams: 12 20 70
 
"Умный" дизассемблер с машинного кода в человеческий подобным способом наверное можно сделать. Как переводчик. А по бинарному коду определить плохой файл или нет - не получится. Тут нужно на более высоком уровне анализировать - скажем прога про себя заявляет, что она может хранить данные в облаке. А на деле при запуске зачем-то перебирает файлы в директории, где куки лежат, что-то наверное пакует в зип, потому что слинкована с соответствующей библиотекой, и для чего-то умеет аплоадить файлы на сервер. Значит явно что-то тут не то - это либо стилер, либо вредоносная шпионская программа - OneDrive.
 
"Умный" дизассемблер с машинного кода в человеческий подобным способом наверное можно сделать. Как переводчик. А по бинарному коду определить плохой файл или нет - не получится. Тут нужно на более высоком уровне анализировать - скажем прога про себя заявляет, что она может хранить данные в облаке. А на деле при запуске зачем-то перебирает файлы в директории, где куки лежат, что-то наверное пакует в зип, потому что слинкована с соответствующей библиотекой, и для чего-то умеет аплоадить файлы на сервер. Значит явно что-то тут не то - это либо стилер, либо вредоносная шпионская программа - OneDrive.
Почему "умный" дизассемблер не смог определить плохой файл?

Потому что он подумал, что программа, которая перебирает файлы, пакует их в зип, и аплоадит на сервер, просто очень увлечена археологией и хочет изучить древние файлы!
 
на питухоне попроще будет решать задачу

Python:
import sys
import pefile
from capstone import *

def extract_all_ngrams(binary_data):
    ngrams = set()  # Use a set to store unique n-grams
    for n in range(1, len(binary_data) + 1):
        for i in range(len(binary_data) - n + 1):
            ngram = binary_data[i:i + n]
            ngrams.add(' '.join(f'{b:02x}' for b in ngram))
    return ngrams

def disassemble_pe_sections(file_path):
    try:
        # Initialize Capstone disassembler for x86 architecture in 32-bit mode
        md = Cs(CS_ARCH_X86, CS_MODE_32)
        md.detail = True

        # Load the PE file
        pe = pefile.PE(file_path)

        # Initialize counters
        opcode_count = 0
        section_count = 0

        # Initialize a set to store unique n-grams
        unique_ngrams = set()

        # Iterate through sections and disassemble each one
        for section in pe.sections:
            print("Section: " + section.Name.decode('utf-8', 'ignore').rstrip('\x00'))
            binary_data = section.get_data()

            # Increment section count
            section_count += 1

            # Disassemble and print instructions
            for i in md.disasm(binary_data, section.VirtualAddress + pe.OPTIONAL_HEADER.ImageBase):
                # print("Offset:", hex(i.address - section.VirtualAddress))
                # print("Raw Opcode Bytes:", ' '.join(f'{b:02x}' for b in i.bytes))
                # print("Mnemonic:", i.mnemonic)
                # print()

                # Increment opcode count
                opcode_count += 1

                # Extract and add unique n-grams to the set
                ngrams = extract_all_ngrams(i.bytes)
                unique_ngrams.update(ngrams)

        # Print the counts
        print(f"Total Opcodes Processed: {opcode_count}")
        print(f"Total Sections Found: {section_count}")
        print(f"Unique N-grams: {len(unique_ngrams)}")

    except FileNotFoundError:
        print(f"Error: File not found: {file_path}")
        sys.exit(1)
    except CsError as e:
        print(f"Capstone Error: {e}")
        sys.exit(1)
    except Exception as e:
        print(f"Error: {e}")
        sys.exit(1)

def main():
    if len(sys.argv) < 2:
        print("Usage: python pe_disassembler.py <pe_file_path>")
        sys.exit(1)

    pe_file_path = sys.argv[1]

    print("Dissecting a PE file in x86...")
    disassemble_pe_sections(pe_file_path)

if __name__ == "__main__":
    main()

Продолжая работу получаем такое
Python:
import sys
import pefile
from capstone import *

def extract_ngrams(opcode_sequence, n):
    ngrams = set()
    opcode_list = opcode_sequence.split()
    for i in range(len(opcode_list) - n + 1):
        ngram = ' '.join(opcode_list[i:i + n])
        ngrams.add(ngram)
    return ngrams

def disassemble_pe_sections(file_path, n):
    try:
        # Инициализация дизассемблера Capstone для архитектуры x86 (32-бит)
        md = Cs(CS_ARCH_X86, CS_MODE_32)
        md.detail = True

        # Загрузка PE-файла
        pe = pefile.PE(file_path)

        # Инициализация списка для хранения последовательностей опкодов
        opcode_sequences = []

        # Итерация по секциям и дизассемблирование каждой из них
        for section in pe.sections:
            binary_data = section.get_data()

            # Дизассемблирование и создание последовательности опкодов
            opcode_sequence = ' '.join([i.mnemonic for i in md.disasm(binary_data, section.VirtualAddress + pe.OPTIONAL_HEADER.ImageBase)])

            # Добавление последовательности опкодов в список
            opcode_sequences.append(opcode_sequence)

        # Извлечение n-грамм из последовательностей опкодов
        ngrams = set()
        for opcode_sequence in opcode_sequences:
            ngrams.update(extract_ngrams(opcode_sequence, n))

        # Печать извлеченных n-грамм
        for ngram in ngrams:
            print(ngram)

    except FileNotFoundError:
        print(f"Ошибка: Файл не найден: {file_path}")
        sys.exit(1)
    except CsError as e:
        print(f"Ошибка Capstone: {e}")
        sys.exit(1)
    except Exception as e:
        print(f"Ошибка: {e}")
        sys.exit(1)

def main():
    pe_file_path2 = r"C:\DynamoRIO\ConsoleApplication1.exe"  # Укажите путь к PE-файлу x86 (32-бит)
    pe_file_path = r"C:\1.exe"  # Крипт

    n = 2  # Выберите значение n для извлечения n-грамм

    print(f"Извлечение {n}-грамм из PE-файла для архитектуры x86 (32-бит)...")
    disassemble_pe_sections(pe_file_path, n)

if __name__ == "__main__":
    main()


1701221505937.png
 
Последнее редактирование:
Обновлю код
Теперь чутка получше стало)
На основе данных можно тренить модель уже
Извлечение 2-грамм из PE-файла для архитектуры x86-64 (64 бит)...
Отсортированные частоты n-грамм:
add add: 6
mov add: 2
add mov: 2
or add: 2
mov call: 1
ja mov: 1
ret add: 1

Python:
import sys

import pefile
from capstone import *
import re
from collections import Counter


def opcode_stemmer(opcode):
    # Список шаблонов для замен
    patterns = [
        (r'^mov', 'mov'),
        (r'movabs|movsb', 'mov'),
        (r'^sub', 'sub'),
        (r'^push', 'push'),
        (r'^and', 'and'),
        (r'^cmp', 'cmp'),
        (r'^js', 'js'),
        (r'shl', 'shl'),
        (r'^add', 'add'),
        (r'^pop', 'pop')
    ]

    # Ищем подходящий шаблон и возвращаем стем
    for pattern, stem in patterns:
        if re.match(pattern, opcode):
            return stem

    # Если не нашли - возвращаем opcode без изменений
    return opcode


def extract_ngrams(opcode_sequence, n):
    ngrams = set()
    opcode_list = opcode_sequence.split()
    for i in range(len(opcode_list) - n + 1):
        ngram = ' '.join(opcode_list[i:i + n])
        ngrams.add(ngram)
    return ngrams

def disassemble_pe_sections(file_path, n):
    try:
        # Инициализация дизассемблера Capstone для архитектуры x86-64
        md = Cs(CS_ARCH_X86, CS_MODE_64)
        md.detail = True

        # Загрузка PE-файла
        pe = pefile.PE(file_path)

        # Инициализация списка для хранения последовательностей опкодов
        opcode_sequences = []

        # Итерация по секциям и дизассемблирование каждой из них
        for section in pe.sections:
            binary_data = section.get_data()

            # Дизассемблирование и создание последовательности опкодов
            opcode_sequence = ' '.join([opcode_stemmer(i.mnemonic) for i in
                                        md.disasm(binary_data, section.VirtualAddress + pe.OPTIONAL_HEADER.ImageBase)])
            # Добавление последовательности опкодов в список
            opcode_sequences.append(opcode_sequence)

        # Считаем частоты n-грамм с помощью collections.Counter
        ngram_counts = Counter()
        for opcode_sequence in opcode_sequences:
            ngram_counts.update(extract_ngrams(opcode_sequence, n))

        # Печать отсортированных частот n-грамм
        sorted_ngram_counts = ngram_counts.most_common()
        print("Отсортированные частоты n-грамм:")
        for ngram, freq in sorted_ngram_counts:
            print(f"{ngram}: {freq}")

    except FileNotFoundError:
        print(f"Ошибка: Файл не найден: {file_path}")
        sys.exit(1)
    except CsError as e:
        print(f"Ошибка Capstone: {e}")
        sys.exit(1)
    except Exception as e:
        print(f"Ошибка: {e}")
        sys.exit(1)

def main():
    pe_file_path = r"C:\work\t\ConsoleApplication1_messagebox_64_0x0000B72E45E93254.exe"
    n = 2  # Выберите значение n для извлечения n-грамм

    print(f"Извлечение {n}-грамм из PE-файла для архитектуры x86-64 (64 бит)...")
    disassemble_pe_sections(pe_file_path, n)

if __name__ == "__main__":
    main()


если есть желание пойти дальше

Python:
opcode_sequence = set()
            for i in md.disasm(binary_data, section.VirtualAddress + pe.OPTIONAL_HEADER.ImageBase):

                opcode = (i.mnemonic) + ' ' + i.op_str
                opcode_sequence.add(opcode)
            opcode_sequence = ' '.join(opcode_sequence)

Отсортированные частоты n-грамм:
qword ptr: 4389
[rip +: 4138
ptr [rip: 3397
dword ptr: 1767
call qword: 1748
ptr [rbp: 1552
[rbp +: 1199
mov qword: 843
 
Последнее редактирование:
Обновленный скрипт

Python:
import sys
import pefile
from capstone import *
import re
from collections import Counter


def opcode_stemmer(opcode):
    # Список шаблонов для замен
    patterns = [
        (r'^mov', 'mov'),
        (r'movabs|movsb', 'mov'),
        (r'^sub', 'sub'),
        (r'^push', 'push'),
        (r'^and', 'and'),
        (r'^cmp', 'cmp'),
        (r'^js', 'js'),
        (r'shl', 'shl'),
        (r'^add', 'add'),
        (r'^pop', 'pop')
    ]

    # Ищем подходящий шаблон и возвращаем стем
    for pattern, stem in patterns:
        if re.match(pattern, opcode):
            return stem

    # Если не нашли - возвращаем opcode без изменений
    return opcode


def disassemble_pe_sections(file_path, n):
    try:
        # Инициализация дизассемблера Capstone для архитектуры x86-64
        md = Cs(CS_ARCH_X86, CS_MODE_64)
        md.detail = True

        # Загрузка PE-файла
        pe = pefile.PE(file_path)

        # Инициализация списка для хранения последовательностей опкодов
        opcode_sequences = []

        # Итерация по секциям и дизассемблирование каждой из них
        for section in pe.sections:
            binary_data = section.get_data()

            # Дизассемблирование и создание последовательности опкодов
            opcode_sequence = []
            for i in md.disasm(binary_data, section.VirtualAddress + pe.OPTIONAL_HEADER.ImageBase):
                opcode = (i.mnemonic) + ' ' + i.op_str
                opcode_sequences.append(opcode)

        # Считаем частоты n-грамм с помощью collections.Counter
        ngram_counts = Counter()
        for opcode_sequence in opcode_sequences:
            # Skip empty sequences
            if not opcode_sequence:
                continue

            opcode_list = opcode_sequence.split()
            for i in range(len(opcode_list) - n + 1):
                ngram = ' '.join(opcode_list[i:i + n])

                # Skip n-grams containing "[" or "+"
                if "[" in ngram or "+" in ngram:
                    continue

                ngram_counts[ngram] += 1

        # Печать отсортированных частот n-грамм
        sorted_ngram_counts = sorted(ngram_counts.items(), key=lambda x: x[1], reverse=True)
        print("Отсортированные частоты n-грамм:")
        for ngram, freq in sorted_ngram_counts:
            print(f"{ngram}: {freq}")

    except FileNotFoundError:
        print(f"Ошибка: Файл не найден: {file_path}")
        sys.exit(1)
    except CsError as e:
        print(f"Ошибка Capstone: {e}")
        sys.exit(1)
    except Exception as e:
        print(f"Ошибка: {e}")
        sys.exit(1)


def main():
    pe_file_path = r"C:\Windows\notepad.exe"
    n = 3  # Выберите значение n для извлечения n-грамм

    print(f"Извлечение {n}-грамм из PE-файла для архитектуры x86-64 (64 бит)...")
    disassemble_pe_sections(pe_file_path, n)


if __name__ == "__main__":
    main()
Извлечение 3-грамм из PE-файла для архитектуры x86-64 (64 бит)...
Отсортированные частоты n-грамм:
mov qword ptr: 1962
call qword ptr: 1767
nop dword ptr: 1349
mov dword ptr: 1303
rax, qword ptr: 983
rcx, qword ptr: 949
mov rax, qword: 938
mov rcx, qword: 920
 
дополняю

Python:
import sys
import pefile
from capstone import *
import re
from collections import Counter

import nltk
from sklearn.feature_extraction.text import CountVectorizer

def opcode_stemmer(opcode):
    # Список шаблонов для замен
    patterns = [
        (r'^mov', 'mov'),
        (r'movabs|movsb', 'mov'),
        (r'^sub', 'sub'),
        (r'^push', 'push'),
        (r'^and', 'and'),
        (r'^cmp', 'cmp'),
        (r'^js', 'js'),
        (r'shl', 'shl'),
        (r'^add', 'add'),
        (r'^pop', 'pop')
    ]

    # Ищем подходящий шаблон и возвращаем стем
    for pattern, stem in patterns:
        if re.match(pattern, opcode):
            return stem

    # Если не нашли - возвращаем opcode без изменений
    return opcode


def disassemble_pe_sections(file_path, n):
    try:
        # Инициализация дизассемблера Capstone для архитектуры x86-64
        md = Cs(CS_ARCH_X86, CS_MODE_64)
        md.detail = True

        # Загрузка PE-файла
        pe = pefile.PE(file_path)

        # Инициализация списка для хранения последовательностей опкодов
        opcode_sequences = []

        # Итерация по секциям и дизассемблирование каждой из них
        for section in pe.sections:
            binary_data = section.get_data()

            # Дизассемблирование и создание последовательности опкодов
            opcode_sequence = []
            for i in md.disasm(binary_data, section.VirtualAddress + pe.OPTIONAL_HEADER.ImageBase):
                opcode = (i.mnemonic) + ' ' + i.op_str
                opcode_sequences.append(opcode)

        # Создаем новый список для хранения групп по три элемента
        result_list = []

        # Используем цикл для перебора элементов и добавления их в новый список
        for i in range(0, len(opcode_sequences), 3):
            # Получаем три элемента с помощью среза
            batch = opcode_sequences[i:i + 3]

            # Добавляем эту группу в новый список
            result_list.append(' | '.join(batch))

        ngram_counts = Counter()
        for opcode_sequence in result_list:
            # Разбиваем строку на инструкции, используя разделитель "|"
            instructions = opcode_sequence.split(" | ")
            n = 2  # Здесь устанавливаем желаемую длину n-граммы
            # Создаем словарь для подсчета 2-грамм

            # Skip empty sequences
            if not opcode_sequence:
                continue

            opcode_list = opcode_sequence
            for i in range(len(instructions) - n + 1):
                ngram = ' | '.join(instructions[i:i + n])

                # Пропустить n-граммы, содержащие "[" или "+"
                # if "[" in ngram or "+" in ngram:
                #     continue

                # Добавляем 2-грамму в словарь или увеличиваем счетчик
                if ngram not in ngram_counts:
                    ngram_counts[ngram] = 1
                else:
                    ngram_counts[ngram] += 1



        # Печать отсортированных частот n-грамм
        sorted_ngram_counts = sorted(ngram_counts.items(), key=lambda x: x[1], reverse=True)
        print("Отсортированные частоты n-грамм:")
        for ngram, freq in sorted_ngram_counts:
            print(f"{ngram}: {freq}")

        # first_elements = [tup[0] for tup in sorted_ngram_counts]
        # # Преобразование текстовых данных в 2-граммы
        # nltk.download('punkt')
        # ngram_sequences = []
        # for opcode_sequence in first_elements:
        #     tokens = nltk.word_tokenize(opcode_sequence)
        #     ngrams = [' '.join(tokens[i:i + 2]) for i in range(len(tokens) - 1)]
        #     ngram_sequences.append(' '.join(ngrams))
        #
        # # Создание частотных векторов
        # vectorizer = CountVectorizer()
        # X = vectorizer.fit_transform(ngram_sequences)
        #
        # # Получение матрицы частотных векторов
        # frequency_vectors = X.toarray()
        #
        # # Вывод частотных векторов
        # for i, opcode_sequence in enumerate(first_elements):
        #     print(f"Частотный вектор для программы {i + 1}: {frequency_vectors[i]}")

    except FileNotFoundError:
        print(f"Ошибка: Файл не найден: {file_path}")
        sys.exit(1)
    except CsError as e:
        print(f"Ошибка Capstone: {e}")
        sys.exit(1)
    except Exception as e:
        print(f"Ошибка: {e}")
        sys.exit(1)


def main():
    #pe_file_path = r"C:\work\t\ConsoleApplication1_messagebox_64_0x0000CDF3800AA52C.exe"
    pe_file_path = r"C:\Windows\notepad.exe"
    n = 3  # Выберите значение n для извлечения n-грамм

    print(f"Извлечение {n}-грамм из PE-файла для архитектуры x86-64 (64 бит)...")
    disassemble_pe_sections(pe_file_path, n)


if __name__ == "__main__":
    main()

тут уже немного дальше пошел
как видите извлекатор умнеет на глазах)) теперь ток обучать модель
Извлечение 3-грамм из PE-файла для архитектуры x86-64 (64 бит)...
Отсортированные частоты n-грамм:
int3 | int3 : 2780
ret | int3 : 251
xor edx, edx | mov rcx, rax: 76
pop rdi | ret : 67
nop dword ptr [rax + rax] | test eax, eax: 62


по частотным векторам там инфа такого характера уже пойдет!
1701742848831.png



Дальше уже дело за малым
1701744631147.png


анализ ms office x64 идет нормально)
1701746654444.png
 
Последнее редактирование:
Итак подведу итоги (для 64 бит, для 32бит сильно парится не надо, пару строчек поменять только)

Главный скрипт чтобы собрать данные для модели
Python:
import multiprocessing
import os
import sys
import pefile
from capstone import *
import re
from collections import Counter

def opcode_stemmer(opcode):
    # Список шаблонов для замен
    patterns = [
        (r'^mov', 'mov'),
        (r'movabs|movsb', 'mov'),
        (r'^sub', 'sub'),
        (r'^push', 'push'),
        (r'^and', 'and'),
        (r'^cmp', 'cmp'),
        (r'^js', 'js'),
        (r'shl', 'shl'),
        (r'^add', 'add'),
        (r'^pop', 'pop')
    ]

    # Ищем подходящий шаблон и возвращаем стем
    for pattern, stem in patterns:
        if re.match(pattern, opcode):
            return stem

    # Если не нашли - возвращаем opcode без изменений
    return opcode


def disassemble_pe_sections(file_path, n):
    try:
        # Инициализация дизассемблера Capstone для архитектуры x86-64
        md = Cs(CS_ARCH_X86, CS_MODE_64)
        md.detail = True

        # Загрузка PE-файла
        pe = pefile.PE(file_path)

        # Инициализация списка для хранения последовательностей опкодов
        opcode_sequences = []

        # Итерация по секциям и дизассемблирование каждой из них
        for section in pe.sections:
            binary_data = section.get_data()

            # Дизассемблирование и создание последовательности опкодов
            opcode_sequence = []
            for i in md.disasm(binary_data, section.VirtualAddress + pe.OPTIONAL_HEADER.ImageBase):
                opcode = (i.mnemonic) + ' ' + i.op_str
                opcode_sequences.append(opcode)

        # Создаем новый список для хранения групп по три элемента
        result_list = []

        # Используем цикл для перебора элементов и добавления их в новый список
        for i in range(0, len(opcode_sequences), 3):
            # Получаем три элемента с помощью среза
            batch = opcode_sequences[i:i + 3]

            # Добавляем эту группу в новый список
            result_list.append(' | '.join(batch))

        ngram_counts = Counter()
        for opcode_sequence in result_list:
            # Разбиваем строку на инструкции, используя разделитель "|"
            instructions = opcode_sequence.split(" | ")
            n = 2  # Здесь устанавливаем желаемую длину n-граммы

            opcode_list = opcode_sequence
            for i in range(len(instructions) - n + 1):
                ngram = ' | '.join(instructions[i:i + n])

                # Пропустить n-граммы, содержащие "[" или "+"
                # if "[" in ngram or "+" in ngram:
                #     continue

                # Добавляем 2-грамму в словарь или увеличиваем счетчик
                if ngram not in ngram_counts:
                    ngram_counts[ngram] = 1
                else:
                    ngram_counts[ngram] += 1

        # Печать отсортированных частот n-грамм
        sorted_ngram_counts = sorted(ngram_counts.items(), key=lambda x: x[1], reverse=True)

        def save_sorted_ngram_counts_to_file(sorted_ngram_counts, output_folder, output_file_name):
            try:
                # Объединяем путь к папке и имя файла
                output_path = os.path.join(output_folder, output_file_name)

                with open(output_path, "w") as file:
                    for ngram, freq in sorted_ngram_counts:
                        file.write(f"{ngram}: {freq}\n")
                print(f"Данные успешно сохранены в файл: {output_path}")
            except Exception as e:
                print(f"Ошибка при сохранении данных в файл: {e}")

        file_name = os.path.basename(file_path)
        output_folder = r"C:\Users\WORKER\PycharmProjects\parse_files_to_ml\ngrams_white_files"  # Замените на путь к вашей папке для сохранения

        output_file_name = file_name + '.txt'  # Задайте имя файла
        save_sorted_ngram_counts_to_file(sorted_ngram_counts, output_folder, output_file_name)

    except FileNotFoundError:
        print(f"Ошибка: Файл не найден: {file_path}")
        sys.exit(1)
    except CsError as e:
        print(f"Ошибка Capstone: {e}")
        sys.exit(1)
    except Exception as e:
        print(f"Ошибка: {e}")
        sys.exit(1)


def process_pe_file(file_path, n):
    try:
        disassemble_pe_sections(file_path, n)
    except Exception as e:
        print(f"Ошибка при обработке файла {file_path}: {e}")


def process_pe_files_in_folder(folder_path, n):
    pool = multiprocessing.Pool()  # Создание пула процессов

    for root, dirs, files in os.walk(folder_path):
        for filename in files:
            file_path = os.path.join(root, filename)

            try:
                pe = pefile.PE(file_path)

                if pe.is_dll() or pe.is_exe():
                    machine_type = pe.FILE_HEADER.Machine
                    if machine_type == 0x8664 or machine_type == 0x14C:
                        pool.apply_async(process_pe_file, (file_path, n))

            except pefile.PEFormatError:
                pass  # Игнорировать файлы, которые не являются PE-файлами

    pool.close()
    pool.join()


def main():
    input_folder = r"C:\Users\WORKER\PycharmProjects\parse_files_to_ml\files_to_analyze"  # Замените на путь к вашей папке с PE файлами
    n = 3  # Выберите значение n для извлечения n-грамм
    process_pe_files_in_folder(input_folder, n)


if __name__ == "__main__":
    main()
Этот код выполняет дизассемблирование PE-файлов, извлекает последовательности опкодов и создает 2-граммы. Затем он сохраняет частоты 2-грамм в файлы для дальнейшей обработки.


Рекурсивный скрипт создания модели из файлов .txt
Python:
import os
from typing import Dict
import joblib

from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.svm import OneClassSVM
from sklearn.metrics import confusion_matrix, precision_recall_curve, f1_score

# Путь к папке с файлами txt
folder_path = r"C:\Users\WORKER\PycharmProjects\parse_files_to_ml\test"
# Путь для сохранения обученной модели
model_directory = 'models'  # Укажите желаемую директорию
model_filename = 'oneclasssvm_model.pkl'


# Функция для чтения текста из файла
def read_file(file_path):
    with open(file_path, 'r') as file:
        return file.read()


# Функция для подсчета инструкций и их весов
def count_instructions(text):
    instructions = {}
    lines = text.split('\n')

    for line in lines:
        parts = line
        if parts:
            weight_parts = parts.strip().split(':')
            if len(weight_parts) == 2:
                line = weight_parts[0]
                weight = int(weight_parts[1].strip())
                instructions[line] = weight
    return instructions


# Инициализируем списки для хранения настоящих и предсказанных меток, а также оценок
true_labels = []
predicted_labels = []
predicted_scores = []

# Получаем список файлов в папке
files = os.listdir(folder_path)

# Проходимся по каждому файлу в папке
for file_name in files:
    if file_name.endswith('.txt'):
        file_path = os.path.join(folder_path, file_name)
        text = read_file(file_path)
        file_instructions: Dict[str, int] = count_instructions(text)

        # Подготавливаем данные (X - признаки)
        ngram_features = list(file_instructions.keys())
        ngram_frequencies = list(file_instructions.values())

        # Преобразование названий н-грамм в векторы признаков с использованием TfidfVectorizer
        vectorizer = TfidfVectorizer()
        X = vectorizer.fit_transform(ngram_features)

        # Сохраняем TF-IDF векторизатор в файл
        vectorizer_filename = 'tfidf_vectorizer.pkl'
        vectorizer_path = os.path.join(model_directory, vectorizer_filename)
        joblib.dump(vectorizer, vectorizer_path)

        # Создаем и обучаем модель One-Class SVM (SVDD)
        clf = OneClassSVM(kernel='linear', nu=0.05)  # Настройте параметры под свою задачу
        clf.fit(X)

        # Прогнозируем аномалии (1 для нормальных, -1 для аномальных)
        y_pred = clf.predict(X)

        # Добавляем настоящие метки (1 для нормальных данных, -1 для аномалий) в список true_labels
        true_labels.extend([1] * X.shape[0])

        # Добавляем предсказанные метки в список predicted_labels
        predicted_labels.extend(y_pred.tolist())

        # Добавляем оценки аномалий в список predicted_scores
        decision_function = clf.decision_function(X)
        predicted_scores.extend(decision_function.tolist())

        # Убедитесь, что директория существует; создайте ее, если ее нет
        if not os.path.exists(model_directory):
            os.makedirs(model_directory)
        # Сохраняем обученную модель в указанный путь
        model_path = os.path.join(model_directory, model_filename)
        joblib.dump(clf, model_path)

# Вычисляем матрицу ошибок
conf_matrix = confusion_matrix(true_labels, predicted_labels)
print("Матрица ошибок:")
print(conf_matrix)

# Вычисляем Precision-Recall кривую и F1-меру
precision, recall, _ = precision_recall_curve(true_labels, predicted_scores)
f1 = f1_score(true_labels, predicted_labels)
print("F1-мера:", f1)

Этот код выполняет следующие действия:
1. Импортирует необходимые библиотеки.
2. Устанавливает пути к папке с файлами и местоположению для сохранения модели.
3. Определяет функции для чтения файлов и подсчета инструкций и их весов.
4. Инициализирует списки для хранения меток и оценок.
5. Получает список файлов в указанной папке.
6. Обрабатывает каждый файл в цикле:
- Читает текст из файла.
- Подсчитывает инструкции и их веса.
- Подготавливает данные и преобразует их в векторы с использованием TF-IDF.
- Создает и обучает модель One-Class SVM.
- Прогнозирует аномалии и сохраняет модель и TF-IDF векторизатор.
- Добавляет метки и оценки в соответствующие списки.
7. Вычисляет матрицу ошибок и выводит ее.
8. Вычисляет Precision-Recall кривую и F1-меру и выводит их значения.

Скрипт для проверки текста по модели
Python:
import os
import joblib

# Определение директории и имен файлов, где сохранены модели
model_directory = 'models'  # Директория с моделями
svm_model_filename = 'oneclasssvm_model.pkl'  # Имя файла с моделью One-Class SVM
vectorizer_filename = 'tfidf_vectorizer.pkl'  # Имя файла с TF-IDF векторизатором
svm_model_path = os.path.join(model_directory, svm_model_filename)  # Полный путь к файлу модели One-Class SVM
vectorizer_path = os.path.join(model_directory, vectorizer_filename)  # Полный путь к файлу TF-IDF векторизатора

# Загрузка сохраненной модели One-Class SVM
loaded_model = joblib.load(svm_model_path)

# Загрузка сохраненного TF-IDF векторизатора
loaded_vectorizer = joblib.load(vectorizer_path)

# Определение функции для предобработки текста и выполнения прогнозов с использованием загруженной модели
def predict_anomaly(text, loaded_model, loaded_vectorizer):
    # Преобразование входного текста с использованием загруженного TF-IDF векторизатора
    X = loaded_vectorizer.transform([text])

    # Прогнозирование, является ли входной текст аномалией или нет
    prediction = loaded_model.predict(X)

    # Прогноз будет 1 для нормальных данных и -1 для аномалий
    if prediction[0] == 1:
        return "Нормальный"
    else:
        return "Аномалия"

# Пример использования:
text_to_predict = "mov eax"
result = predict_anomaly(text_to_predict, loaded_model, loaded_vectorizer)
print(f"Прогноз: {result}")
Этот код загружает ранее сохраненную модель One-Class SVM и TF-IDF векторизатор, а затем использует их для классификации входного текста как "Нормальный" или "Аномалия".


1701764375576.png
1701764398222.png
1701764411899.png
1701764450955.png
1701764494710.png


Сугубо авторский контент!
 
Код для 32bit

парсим файлы
Python:
import multiprocessing
import os
import sys
import pefile
from capstone import *
import re
from collections import Counter


def opcode_stemmer(opcode):
    # Список шаблонов для замен
    patterns = [
        (r'^mov', 'mov'),
        (r'movabs|movsb', 'mov'),
        (r'^sub', 'sub'),
        (r'^push', 'push'),
        (r'^and', 'and'),
        (r'^cmp', 'cmp'),
        (r'^js', 'js'),
        (r'shl', 'shl'),
        (r'^add', 'add'),
        (r'^pop', 'pop')
    ]

    # Ищем подходящий шаблон и возвращаем стем
    for pattern, stem in patterns:
        if re.match(pattern, opcode):
            return stem

    # Если не нашли - возвращаем opcode без изменений
    return opcode


def disassemble_pe_sections(file_path, n):
    try:
        # Инициализация дизассемблера Capstone для архитектуры x86-64
        md = Cs(CS_ARCH_X86, CS_MODE_32)
        md.detail = True

        # Загрузка PE-файла
        pe = pefile.PE(file_path)

        # Инициализация списка для хранения последовательностей опкодов
        opcode_sequences = []

        # Итерация по секциям и дизассемблирование каждой из них
        for section in pe.sections:
            binary_data = section.get_data()

            # Дизассемблирование и создание последовательности опкодов
            opcode_sequence = []
            for i in md.disasm(binary_data, section.VirtualAddress + pe.OPTIONAL_HEADER.ImageBase):
                opcode = (i.mnemonic) + ' ' + i.op_str
                # Используем регулярное выражение для извлечения значения внутри квадратных скобок
                # match = re.search(r'0x[0-9a-fA-F]+', opcode)
                # if match:
                #     continue
                # else:
                opcode_sequences.append(opcode)

        # Extract opcode mnemonics from the instructions
        opcodes = [instruction.split()[0] for instruction in opcode_sequences]

        # Create bi-grams from the opcode mnemonics
        bi_grams = [(opcodes[i], opcodes[i + 1]) for i in range(len(opcodes) - 1)]
        # Create trigrams from the opcode mnemonics
        trigrams = [(opcodes[i], opcodes[i + 1], opcodes[i + 2]) for i in range(len(opcodes) - 2)]

        # Создаем новый список для хранения групп по три элемента (триграммы)
        result_list = []

        # Используем цикл для перебора триграмм и добавления их в новый список
        for i in range(0, len(trigrams)):
            # Получаем триграмму
            trigram = trigrams[i]

            # Преобразуем триграмму в строку и добавляем в новый список
            result_list.append(' | '.join(trigram))

        # В result_list теперь будут храниться триграммы в виде строк

        ngram_counts = Counter()
        for opcode_sequence in result_list:
            # Разбиваем строку на инструкции, используя разделитель "|"
            instructions = opcode_sequence.split(" | ")
            n = 3  # Здесь устанавливаем желаемую длину n-граммы

            opcode_list = opcode_sequence
            for i in range(len(instructions) - n + 1):
                ngram = ' | '.join(instructions[i:i + n])

                # Пропустить n-граммы, содержащие "[" или "+"
                # if "[" in ngram or "+" in ngram:
                #     continue

                # Добавляем 2-грамму в словарь или увеличиваем счетчик
                if ngram not in ngram_counts:
                    ngram_counts[ngram] = 1
                else:
                    ngram_counts[ngram] += 1

        # Печать отсортированных частот n-грамм
        sorted_ngram_counts = sorted(ngram_counts.items(), key=lambda x: x[1], reverse=True)

        def save_sorted_ngram_counts_to_file(sorted_ngram_counts, output_folder, output_file_name):
            try:
                # Объединяем путь к папке и имя файла
                output_path = os.path.join(output_folder, output_file_name)

                with open(output_path, "w") as file:
                    for ngram, freq in sorted_ngram_counts:
                        file.write(f"{ngram}: {freq}\n")
                print(f"Данные успешно сохранены в файл: {output_path}")
            except Exception as e:
                print(f"Ошибка при сохранении данных в файл: {e}")

        file_name = os.path.basename(file_path)
        output_folder = r"C:\Users\WORKER\PycharmProjects\parse_files_to_ml\ngrams_white_files32"  # Замените на путь к вашей папке для сохранения

        output_file_name = file_name + '.txt'  # Задайте имя файла
        save_sorted_ngram_counts_to_file(sorted_ngram_counts, output_folder, output_file_name)

    except FileNotFoundError:
        print(f"Ошибка: Файл не найден: {file_path}")
        sys.exit(1)
    except CsError as e:
        print(f"Ошибка Capstone: {e}")
        sys.exit(1)
    except Exception as e:
        print(f"Ошибка: {e}")
        sys.exit(1)


def process_pe_file(file_path, n):
    try:
        disassemble_pe_sections(file_path, n)
    except Exception as e:
        print(f"Ошибка при обработке файла {file_path}: {e}")


def process_pe_files_in_folder(folder_path, n):
    pool = multiprocessing.Pool()  # Создание пула процессов

    for root, dirs, files in os.walk(folder_path):
        for filename in files:
            file_path = os.path.join(root, filename)

            try:
                pe = pefile.PE(file_path)

                if pe.is_dll() or pe.is_exe():
                    machine_type = pe.FILE_HEADER.Machine
                    if machine_type == 0x14C:
                        pool.apply_async(process_pe_file, (file_path, n))

            except pefile.PEFormatError:
                pass  # Игнорировать файлы, которые не являются PE-файлами

    pool.close()
    pool.join()


def main():
    input_folder = r"C:\Users\WORKER\Downloads\ru_office_2003_pro_with_sp3_and_updates_vl_v3"  # Замените на путь к вашей папке с PE файлами
    # input_folder = r"C:\Users\WORKER\PycharmProjects\parse_files_to_ml\files_to_analyze32"  # Замените на путь к вашей папке с PE файлами
    n = 3  # Выберите значение n для извлечения n-грамм
    process_pe_files_in_folder(input_folder, n)


if __name__ == "__main__":
    main()

строим модель из файлов

Python:
import os
import joblib
from typing import Dict
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.svm import OneClassSVM
from sklearn.metrics import confusion_matrix, precision_recall_curve, f1_score
from joblib import Parallel, delayed

# Путь к папке с файлами txt
folder_path = r"C:\Users\WORKER\PycharmProjects\parse_files_to_ml\ngrams_white_files32"
# Путь для сохранения обученной модели
model_directory = 'models32'  # Укажите желаемую директорию
model_filename = 'oneclasssvm_model.pkl'
vectorizer_filename = 'tfidf_vectorizer.pkl'
VES_DLYA_OBREZA = 0

# Функция для чтения текста из файла
def read_file(file_path):
    with open(file_path, 'r') as file:
        return file.read()

# Функция для извлечения признаков из текста
def extract_features(text):
    instructions = {}
    lines = text.split('\n')

    for line in lines:
        parts = line.strip().split(':')
        if len(parts) == 2:
            instruction = parts[0]
            weight = int(parts[1].strip())
            if weight >= VES_DLYA_OBREZA:
                instructions[instruction] = weight
    return instructions

# Инициализируем списки для хранения настоящих и предсказанных меток, а также оценок
true_labels = []
predicted_labels = []
predicted_scores = []

# Создаем один TfidfVectorizer для всех файлов
vectorizer = TfidfVectorizer()

# Список для собирания признаков из всех файлов
all_features = []

# Получаем список файлов в папке
files = os.listdir(folder_path)

# Проходимся по каждому файлу в папке
for index, file_name in enumerate(files, start=1):
    if file_name.endswith('.txt'):
        file_path = os.path.join(folder_path, file_name)
        text = read_file(file_path)
        file_instructions: Dict[str, int] = extract_features(text)

        # Объединяем инструкции из всех файлов
        all_features.extend(file_instructions.keys())

        # Все инструкции считаем нормальными
        true_labels.extend([1] * len(file_instructions))

        # Выводим текущий статус обработки
        print(f"Обработано файлов: {index}/{len(files)}")

# Обновляем TfidfVectorizer с новыми данными
vectorizer.fit(all_features)

# Преобразование названий н-грамм в векторы признаков
X = vectorizer.transform(all_features)
X = X.tocsr(copy=True)

def calculate_f1(clf, X, y_true):
    y_pred = clf.predict(X)
    f1 = f1_score(y_true, y_pred)
    return f1

def train_model(params, X):
    X = X.copy()
    clf = OneClassSVM(**params)
    clf.fit(X)
    f1 = calculate_f1(clf, X, true_labels)  # вычислить F1 на валидации
    return clf, f1


parameters_list = [
    {"kernel": "linear", "nu": 0.01},
    {"kernel": "rbf", "nu": 0.01, "gamma": "auto"},
    {"kernel": "sigmoid", "nu": 0.05, "gamma": 0.001},
    {"kernel": "linear", "nu": 0.05},
]


best_clf = Parallel(n_jobs=61)(delayed(train_model)(params, X)
                              for params in parameters_list)

# Выбираем лучшую модель
best_clf, best_f1 = max(best_clf, key=lambda x: x[1])

# Прогнозируем аномалии
y_pred = best_clf.predict(X)

# Оценка качества, сохранение модели

# Добавляем предсказанные метки и оценки аномалий
predicted_labels.extend(y_pred.tolist())
decision_function = best_clf.decision_function(X)
predicted_scores.extend(decision_function.tolist())

# Убедитесь, что директория существует; создайте ее, если ее нет
if not os.path.exists(model_directory):
    os.makedirs(model_directory)

# Сохраняем обученную модель и TfidfVectorizer в указанные файлы
model_path = os.path.join(model_directory, model_filename)
joblib.dump(best_clf, model_path)

vectorizer_path = os.path.join(model_directory, vectorizer_filename)
joblib.dump(vectorizer, vectorizer_path)

# Вычисляем матрицу ошибок
conf_matrix = confusion_matrix(true_labels, predicted_labels)
print("Матрица ошибок:")
print(conf_matrix)

# Вычисляем Precision-Recall кривую и F1-меру
precision, recall, _ = precision_recall_curve(true_labels, predicted_scores)
f1 = f1_score(true_labels, predicted_labels)
print("F1-мера:", f1)

тестим модель по файлу

Python:
import multiprocessing
import os
import sys

import joblib
import pefile
from capstone import *
import re
from collections import Counter

files_to_delete = []


def opcode_stemmer(opcode):
    # Список шаблонов для замен
    patterns = [
        (r'^mov', 'mov'),
        (r'movabs|movsb', 'mov'),
        (r'^sub', 'sub'),
        (r'^push', 'push'),
        (r'^and', 'and'),
        (r'^cmp', 'cmp'),
        (r'^js', 'js'),
        (r'shl', 'shl'),
        (r'^add', 'add'),
        (r'^pop', 'pop')
    ]

    # Ищем подходящий шаблон и возвращаем стем
    for pattern, stem in patterns:
        if re.match(pattern, opcode):
            return stem

    # Если не нашли - возвращаем opcode без изменений
    return opcode

def disassemble_pe_sections(file_path, n):
    try:
        # Инициализация дизассемблера Capstone для архитектуры x86-64
        md = Cs(CS_ARCH_X86, CS_MODE_64)
        md.detail = True

        # Загрузка PE-файла
        pe = pefile.PE(file_path)

        # Инициализация списка для хранения последовательностей опкодов
        opcode_sequences = []

        # Итерация по секциям и дизассемблирование каждой из них
        for section in pe.sections:
            binary_data = section.get_data()

            # Дизассемблирование и создание последовательности опкодов
            opcode_sequence = []
            for i in md.disasm(binary_data, section.VirtualAddress + pe.OPTIONAL_HEADER.ImageBase):
                opcode = (i.mnemonic) + ' ' + i.op_str
                opcode_sequences.append(opcode)
            # Extract opcode mnemonics from the instructions
        opcodes = [instruction.split()[0] for instruction in opcode_sequences]

        # Create trigrams from the opcode mnemonics
        trigrams = [(opcodes[i], opcodes[i + 1], opcodes[i + 2]) for i in range(len(opcodes) - 2)]
        # Определение директории и имен файлов, где сохранены модели
        model_directory = 'models32'  # Директория с моделями
        svm_model_filename = 'oneclasssvm_model.pkl'  # Имя файла с моделью One-Class SVM
        vectorizer_filename = 'tfidf_vectorizer.pkl'  # Имя файла с TF-IDF векторизатором
        svm_model_path = os.path.join(model_directory, svm_model_filename)  # Полный путь к файлу модели One-Class SVM
        vectorizer_path = os.path.join(model_directory, vectorizer_filename)  # Полный путь к файлу TF-IDF векторизатора

        # Загрузка сохраненной модели One-Class SVM
        loaded_model = joblib.load(svm_model_path)

        # Загрузка сохраненного TF-IDF векторизатора
        loaded_vectorizer = joblib.load(vectorizer_path)

        # Определение функции для предобработки текста и выполнения прогнозов с использованием загруженной модели
        def predict_anomaly(text, loaded_model, loaded_vectorizer):
            # Преобразование входного текста с использованием загруженного TF-IDF векторизатора
            X = loaded_vectorizer.transform([text])

            # Прогнозирование, является ли входной текст аномалией или нет
            prediction = loaded_model.predict(X)

            # Прогноз будет 1 для нормальных данных и -1 для аномалий
            if prediction[0] == 1:
                return "Нормальный"
            else:
                return "Аномалия"

        # Создайте две переменные для подсчета нормальных и аномальных значений
        normal_count = 0
        anomaly_count = 0
        result_list = []

        # Объединяем кортежи в список с разделителем "|"
        trigrams_converted = [' | '.join(trigram) for trigram in trigrams]

        # Пример использования:
        for t in trigrams_converted:
            result = predict_anomaly(t, loaded_model, loaded_vectorizer)
            # Подсчитайте нормальные и аномальные значения
            if result == "Нормальный":
                normal_count += 1
                #print('norm: ' + t)

            else:
                anomaly_count += 1
                print('anomaly: ' + t)
            k=1
        # Вычислите процентное соотношение нормальных и аномальных значений
        total_count = len(opcode_sequences)
        normal_percentage = (normal_count / total_count) * 100
        anomaly_percentage = (anomaly_count / total_count) * 100
        # Добавьте процентные значения в новый список
        result_list.append(("Нормальный", normal_percentage))
        result_list.append(("Аномалия", anomaly_percentage))
        # Выведите новый список
        file_name = os.path.basename(file_path)
        print(file_name)
        print('anom count: ' + str(anomaly_count))
        print('norm count: '+str(normal_count))
        print(result_list)
        # Создайте и запишите информацию в файл to_clean.txt
        with open('to_clean.txt', 'a') as file:
            file.write(f"{file_path}: {anomaly_percentage}\n")

        print("Файл to_clean.txt успешно создан.")

    except FileNotFoundError:
        print(f"Ошибка: Файл не найден: {file_path}")
        sys.exit(1)
    except CsError as e:
        print(f"Ошибка Capstone: {e}")
        sys.exit(1)
    except Exception as e:
        print(f"Ошибка: {e}")
        sys.exit(1)


def process_pe_file(file_path, n):
    try:
        disassemble_pe_sections(file_path, n)
    except Exception as e:
        print(f"Ошибка при обработке файла {file_path}: {e}")


def process_pe_files_in_folder(folder_path, n):
    pool = multiprocessing.Pool()  # Создание пула процессов

    for root, dirs, files in os.walk(folder_path):
        for filename in files:
            file_path = os.path.join(root, filename)

            try:
                pe = pefile.PE(file_path)

                if pe.is_dll() or pe.is_exe():
                    machine_type = pe.FILE_HEADER.Machine
                    if machine_type == 0x8664 or machine_type == 0x14C:
                        pool.apply_async(process_pe_file, (file_path, n))

            except pefile.PEFormatError:
                pass  # Игнорировать файлы, которые не являются PE-файлами

    pool.close()
    pool.join()


def main():
    input_folder = r"C:\Users\WORKER\PycharmProjects\parse_files_to_ml\single32"  # Замените на путь к вашей папке с PE файлами
    n = 3  # Выберите значение n для извлечения n-грамм
    process_pe_files_in_folder(input_folder, n)




if __name__ == "__main__":
    main()

Улучшил генерация ngram теперь выдает такие срезы .
add | add | add: 3035
push | push | call: 1229
mov | mov | mov: 1136
push | push | push: 1130
push | call | pop: 767
push | call | add: 532
push | call | mov: 492
lea | push | push: 472
push | mov | push: 403
mov | push | push: 378
pop | pop | pop: 375
push | lea | push: 343
call | pop | pop: 301
push | push | mov: 285
mov | push | mov: 253
ret | push | mov: 250
lea | push | call: 227
pop | pop | ret: 224
push | mov | mov: 219
lea | push | lea: 210
call | mov | pop: 198
pop | ret | push: 198
mov | test | je: 192
mov | lea | push: 183
mov | pop | pop: 182
mov | push | call: 178
pop | push | call: 175
mov | mov | push: 174

Точность повысилась в разы
1702084372642.png
 
Последнее редактирование:
модель для x86 натренирована на ru_office_2003_pro_with_sp3_and_updates_vl_v3

x64 тоже на оффисе
 

Вложения

  • models32.zip
    77.2 КБ · Просмотры: 7
  • models64.zip
    241.3 КБ · Просмотры: 11
Последнее редактирование:
используйте thundersvm
пободрее будет
1702151615732.png

* тут обратите внимание на gpu

pytorch не удалось завести
cuml для линукса, svdd не нашел внутри

1702151527839.png


1702151985192.png


чтобы завести придется немного схитрить
пишем from thundersvm.thundersvm import OneClassSVM вместо обычного
внутри thundersvm.py редачим путь к длл (ее отдельно через cmake скомпилить)
1702152533907.png



Код для обучения через GPU

Python:
import os
import joblib
from typing import Dict
from sklearn.feature_extraction.text import TfidfVectorizer
from thundersvm.thundersvm import OneClassSVM
from sklearn.metrics import confusion_matrix, precision_recall_curve, f1_score
from sklearn.model_selection import train_test_split

# Path to the folder containing txt files
folder_path = r"C:\Users\WORKER\PycharmProjects\parse_files_to_ml\ngrams_white_files32"
# Path to save the trained model
model_directory = 'models32'  # Specify your desired directory
model_filename = 'oneclasssvm_model.pkl'
vectorizer_filename = 'tfidf_vectorizer.pkl'
VES_DLYA_OBREZA = 3
import time  # Импортируйте модуль time

# Засеките время начала тренировки
start_time = time.time()
# Function to read text from a file
def read_file(file_path):
    with open(file_path, 'r') as file:
        return file.read()


# Function to extract features from text
def extract_features(text):
    instructions = {}
    lines = text.split('\n')

    for line in lines:
        parts = line.strip().split(':')
        if len(parts) == 2:
            instruction = parts[0]
            weight = int(parts[1].strip())
            if weight >= VES_DLYA_OBREZA:
                instructions[instruction] = weight
    return instructions


# Initialize lists to store true and predicted labels, as well as scores
true_labels = []
predicted_labels = []
predicted_scores = []

# Create a single TfidfVectorizer for all files
vectorizer = TfidfVectorizer()

# List to collect features from all files
all_features = []

# Get the list of files in the folder
files = os.listdir(folder_path)

# Iterate through each file in the folder
for index, file_name in enumerate(files, start=1):
    if file_name.endswith('.txt'):
        file_path = os.path.join(folder_path, file_name)
        text = read_file(file_path)
        file_instructions: Dict[str, int] = extract_features(text)

        # Combine instructions from all files
        all_features.extend(file_instructions.keys())

        # Consider all instructions as normal
        true_labels.extend([1] * len(file_instructions))

        # Print the current processing status
        print(f"Processed files: {index}/{len(files)}")

# Update the TfidfVectorizer with new data
vectorizer.fit(all_features)

# Convert n-gram names to feature vectors
X = vectorizer.transform(all_features)
X = X.tocsr(copy=True)

# Split the data into training, validation, and test sets
X_train, X_temp, y_train, y_temp = train_test_split(X, true_labels, test_size=0.3, random_state=42)
X_val, X_test, y_val, y_test = train_test_split(X_temp, y_temp, test_size=0.5, random_state=42)


def calculate_f1(clf, X, y_true):
    y_pred = clf.predict(X)
    f1 = f1_score(y_true, y_pred)
    return f1


def train_model(params, X):
    X = X.copy()
    clf = OneClassSVM(**params)
    clf.fit(X)
    f1 = calculate_f1(clf, X, y_train)  # Calculate F1 score on validation
    return clf, f1


parameters_list = [
    {"kernel": "linear", "nu": 0.01},
]

best_f1_score = 0
best_model = None

for params in parameters_list:
    clf, f1 = train_model(params, X_train)

    if f1 > best_f1_score:
        best_f1_score = f1
        best_model = clf

# Predict anomalies on the validation set
y_pred_val = best_model.predict(X_val)

# Add predicted labels and anomaly scores
predicted_labels.extend(y_pred_val.tolist())
decision_function = best_model.decision_function(X_val)
predicted_scores.extend(decision_function.tolist())

# Compute the confusion matrix on the validation set
conf_matrix_val = confusion_matrix(y_val, y_pred_val)
print("Confusion matrix on the validation set:")
print(conf_matrix_val)

# Calculate Precision-Recall curve and F1-score on the validation set
precision_val, recall_val, _ = precision_recall_curve(y_val, predicted_scores)
f1_val = f1_score(y_val, y_pred_val)
print("F1-score on the validation set:", f1_val)

# Choose the best model and predict anomalies on the test set
y_pred_test = best_model.predict(X_test)

# Evaluate the performance on the test set
conf_matrix_test = confusion_matrix(y_test, y_pred_test)
print("Confusion matrix on the test set:")
print(conf_matrix_test)

precision_test, recall_test, _ = precision_recall_curve(y_test, y_pred_test)
f1_test = f1_score(y_test, y_pred_test)
print("F1-score on the test set:", f1_test)

# Save the trained model and TfidfVectorizer to the specified files
if not os.path.exists(model_directory):
    os.makedirs(model_directory)

model_path = os.path.join(model_directory, model_filename)
joblib.dump(best_model, model_path)

vectorizer_path = os.path.join(model_directory, vectorizer_filename)
joblib.dump(vectorizer, vectorizer_path)
# Получить текущее время после завершения обучения
end_time = time.time()
# Рассчитайте время, затраченное на обучение
training_time = end_time - start_time

print(f"Время обучения модели: {training_time} секунд")
1702153058767.png

это обучение всех файлов что спарсил из папки C:\Windows для 32bit

1702153129200.png

*моделька


Определение нормальных файлов
1702154306200.png
 

Вложения

  • thundersvm_dlls.zip
    1.2 МБ · Просмотры: 7
Последнее редактирование:


Напишите ответ...
  • Вставить:
Прикрепить файлы
Верх