• XSS.stack #1 – первый литературный журнал от юзеров форума

Статья Merge File Sort или быстрая сортировка строк и удаление дублей интерпретатором в файле от 400 GB за счет ПЗУ

Мне очень понравилось. Пользуюсь уже давно. Причём работает без сортировки.
И параметры самому можно выставить.
Этот софт работает по временным файлам?
 
Пожалуйста, обратите внимание, что пользователь заблокирован
Этот софт работает по временным файлам?
Скорее всего.
Потому что на HDD для работы места много нужно.
Но работает быстро.
 
Количество строк уникальных неправильно рассчитано.=)
В мультипроцессорной версии поправил обнаруженные баги, оптимизировал код, за счет этого чуть-чуть увеличилась скорость обработки алгоритма.

1727184253424.png


P.S. Добавил комментариев в код.


Многопроцессорная версия, обязательна к обновлению.

P.S.S. Для ускорения можно пробовать разными размерами чанков скармливать лог по своему усмотрению, там где сейчас есть цифра в коде "2000000" меняешь к примеру на "4000000" ну и регулируешь под свое железо в лучшую или худшую сторону, замеряя финальное время обработки скрипта и сравнивать с предыдущим результатом. Но я определил лучшую скорость для своего лога в 2 гига тестового, 2000000 чанков на процесс. Также чтобы усилить нагрузку на железо, можно попробовать в отдельных 2, 3, 4.... системных процесса запустить, к примеру 2 разных лога, на разных папках в 2 консоли) для того чтобы увеличить нагрузку на устройство которым обрабатываете файл. Ну и полезно для обработки сразу нескольких разных логов к примеру.

P.S.S.S И еще, по возможности кто может протестить на разных операционных системах где работает Python, не только на Windows. (Linux, Android, Unix, и т.д.)
 
Последнее редактирование:
Сгенерил для линукса тестовый файл с рандомными фейковыми данными в 160 гигов такого формата:
Код:
Dott. Teresa Novaro/sanguinetisimonetta@tin.it/Bocenago/50.34.246.179
Dott. Paoletta Parpinel/xmorabito@virgilio.it/Giuliano Di Lecce/153.160.84.95
Michelotto Zito/calgariamalia@fastwebnet.it/Quattro Castella/98.12.70.87
Ermes Vattimo-Bompiani/gaspare51@poste.it/Salaparuta/155.233.251.224
Girolamo Prodi/giuliettaudinesi@virgilio.it/Solarino/239.140.189.253
Cassandra Rosiello/flaianovincenzo@alice.it/San Giacomo Di Spoleto/10.215.32.156
Vanessa Cesaroni/carmelo32@alice.it/Roraipiccolo/227.224.170.181
Maura Mercadante/conteludovico@tin.it/San Giorgio Della Richinvelda/112.61.44.170
Giuliana Rossetti/damianogonzaga@vodafone.it/Is Murdegus/156.253.48.96
Leonardo Schiavone/silvio63@virgilio.it/Piazza Del Galdo/68.42.159.62
Donatello Polesel/marta44@poste.it/Vezza D'Oglio/135.157.40.240
Angelo Chigi/napoleonecomeriato@virgilio.it/Budrie/77.28.147.237
Liliana Guidotti/ynicolini@fastwebnet.it/San Giacomo Roncole/108.69.232.28
Aurora Cammarata/nosigliasergius@poste.it/Ischia Ponte/252.125.118.59
Licia Colletti/xtamborini@alice.it/Garbagnate Monastero/163.149.79.234
Gian Tutino/qcasaleggio@vodafone.it/Denice/159.152.52.28
Stella Tosi-Serraglio/halfonsi@virgilio.it/Gazzada Schianno/177.33.104.102
Fernanda Staglieno/paulinaschiaparelli@tin.it/Nasidi/122.54.56.12
Sig.ra Milena Saragat/ugo85@poste.it/Torri Del Benaco/133.24.49.250
Dott. Milena Briccialdi/marcantonio17@libero.it/Casatenovo/174.111.142.188
Flora Barozzi/yricciardi@vodafone.it/Montegiordano Marina/26.74.230.105
Claudia Ferraris-Verdone/dpisani@virgilio.it/Figline Di Prato/234.237.0.201
Luigina Pizzo-Bossi/fmontanari@libero.it/Canonica D'Adda/10.57.145.77
Marta Sgarbi-Brunello/anitaalighieri@fastwebnet.it/Sciconi/109.155.223.64
Pierina Trincavelli-Ferraris/auroratrapani@tim.it/Corfinio/37.221.146.217
Romina Ossani/mazzacuratipina@vodafone.it/Nicosia/129.127.53.129
Pierina Deledda/baccio68@tin.it/Vighignolo/79.179.200.241
Roberto Munari-Solari/stefanoalighieri@vodafone.it/Serbadone/132.52.83.120
Licia Collodi/salandramaurilio@tin.it/Sutri/97.198.115.250
Germana Vespucci/guidomercadante@tin.it/Casinalbo/85.22.23.93
Romana Columbo/stefaniabriccialdi@fastwebnet.it/Castelbellino/89.238.204.173
Dott. Dolores Battisti/hpertile@libero.it/Canalicchio/173.190.108.109
Raffaella Biagiotti/vanessamazzanti@tim.it/San Nicandro Garganico/145.220.205.56
Azeglio Endrizzi/oariasso@tin.it/Niscemi/83.111.120.46
Etta Donati/cardanogustavo@poste.it/Budrione/90.109.207.243
Silvio Camuccini/virgilio41@libero.it/Pedalino/45.196.75.29
Danilo Campise/cilibrasierika@alice.it/Arcisate/211.187.249.215
Dott. Bianca Mennea/mduodo@virgilio.it/Occimiano/29.102.219.171
Raffaello Tamburini/manacordafabrizia@virgilio.it/Casumaro/8.95.40.13
Massimiliano Guglielmi/giannatorricelli@poste.it/La Muddizza/208.216.64.5
Danilo Almagià/ovalentino@tin.it/San Nicola/168.27.112.98
Ezio Gibilisco-Vigorelli/graziella45@alice.it/Carro/208.64.122.168
Ida Ruberto/ealfieri@libero.it/Cerreto Castello/236.232.212.218
Adele Montecchi/sabatino98@virgilio.it/Montabone/36.138.27.131
Paola Ferragamo-Gozzi/buscettagiulia@fastwebnet.it/Viadanica/133.199.141.213
Sylvia Lattuada-Bonolis/gorigreco@tim.it/San Vito Al Torre/204.218.9.34
Dott. Ivo Corradi/volterrastefania@libero.it/Bellizzi Irpino/173.71.237.187
Sig. Alessio Maggioli/giacobbecabibbo@tin.it/Clivio/21.159.179.146
Ernesto Altera/puccimaurilio@alice.it/Carditello/128.25.241.155
Patrizio Micheletti/kpriuli@alice.it/Tegoleto/64.145.145.15
Gian Traversa/marialeonardi@tin.it/Porto Valtravaglia/108.244.255.37
Flavio Serao/landolancisi@tim.it/Matierno/128.126.213.41
Sig.ra Martina Cugia/lazzaromantegna@alice.it/Belmonte Calabro Marina/100.111.229.255
Claudia Gottardi/isabellaluria@fastwebnet.it/Cusio/130.128.198.6
Telemaco Miniati/mpometta@libero.it/Apricale/202.239.109.246
Isa Salgari/gianmarcogermano@libero.it/San Patrizio/119.244.95.106
Durante Morandi-Beffa/liviabelletini@vodafone.it/Bernalda/64.4.198.200
Gianluca Branciforte-Traversa/pomponio23@vodafone.it/Druento/1.177.49.56
Marisa Agnesi/pasqualacerbi@libero.it/Su Cossu/165.95.182.48
Dott. Raffaellino Togliatti/trobbianimichela@alice.it/Sant'Alfio/15.70.213.155
Goffredo Zoppetti/arnaldotoldo@fastwebnet.it/Ca' De' Fabbri/254.134.137.195
Elmo Argan/atenulf54@tim.it/Ravarino/28.245.146.184
Dott. Panfilo Ortese/qsobrero@tin.it/Nola/196.193.82.208
Armando Redi/domenicopisacane@vodafone.it/Caprarico/111.201.129.236
Gino Bellocchio-Condoleo/porcellatoadriana@poste.it/Rosignano Monferrato/112.133.255.209
Augusto Camanni/maria74@tim.it/Senigallia/124.220.135.7
Napoleone Petrocelli/lcavanna@fastwebnet.it/Rosello/186.105.134.221
Tiziano Avogadro-Petralli/renata01@virgilio.it/Cannuzzo/199.88.147.49
Isabella Respighi/piergiorgiosperi@libero.it/Osimo Stazione/26.111.250.131
Gianluigi Mazzeo/etedesco@libero.it/Peregallo/23.151.3.169
Fiorenzo Fuseli-Cattaneo/baldassarefederico@virgilio.it/Asolo/7.97.11.200
Tullio Morpurgo-Morabito/cimarosaannalisa@fastwebnet.it/Piancaldoli/5.38.123.158
Amedeo Tropea/ubiagi@alice.it/Dragoni/223.101.126.214
Rosina Fuseli/acuda@tin.it/San Potito Sannitico/63.3.24.46
Ramona Desio/jboezio@virgilio.it/San Quirico Trecasali/219.207.3.80
Lucia Segni/priscilla75@tin.it/Tapogliano/227.115.109.26
Flora Proietti/tullio65@vodafone.it/Eremo Sant'Anna/253.77.218.86
Berenice Rubbia/girolamo28@poste.it/Laureana Di Borrello/27.191.158.216
Gastone Micheletti/sabatinovecellio@tin.it/Zerba/126.190.90.106
Benedetto Brancaccio/osvaldo33@tin.it/Pregiato/1.9.214.134
Lara Turrini/priuliguido@vodafone.it/Fabriano/48.10.60.68
Sig.ra Teresa Renier/costanzo25@alice.it/Oreno/205.9.100.132
Gemma Camiscione/atomasetti@alice.it/Vesime/33.186.212.9
Licia Berlusconi/inzaghitullio@virgilio.it/Palese/173.50.29.79
Pina Caffarelli/ugarzoni@vodafone.it/Parre Ponte Selva/97.240.226.63
Dott. Patrizia Basso/nolciniilaria@libero.it/Brufa/255.244.150.199
Anita Baroffio/catenazziantonina@libero.it/Zancona/149.142.89.242
Achille Gualandi-Collodi/arturo62@virgilio.it/Villa Passo/131.50.118.17
Donna Zetticci/gbarillaro@libero.it/Lajen/193.120.134.74
Nino Toso/nicoletta19@alice.it/Ciavolo/173.176.219.139
Berenice Piazzi/roccastella@tin.it/Casavatore/253.2.92.64
Imelda Faggiani/migliacciograziella@alice.it/Monte San Quirico/8.221.93.66
Carolina Farina/imelda18@virgilio.it/Lesignana/189.170.116.100
Diana Tamburi/melinarenier@fastwebnet.it/Macchia D'Isernia/129.140.11.28
Piero Canil/camicioneaugusto@tin.it/Saviore Dell'Adamello/220.23.13.61
Ninetta Ubaldi/zpisani@libero.it/Castiglione Di Ravenna/37.113.93.81
Ludovico Soprano/bellinienzio@fastwebnet.it/Sordio/29.146.93.77
Victoria Carpaccio/taliercioadriana@tim.it/Galgiana/81.130.155.76
Ferdinando Alonzi/cicalagiampiero@tin.it/Fabbrico/189.29.151.114
Priscilla Govoni-Giusti/traettaalina@poste.it/San Fermo/72.133.233.250
Giampiero Riccardi-Castioni/nmontalcini@virgilio.it/Lesa/107.152.55.87
Dott. Eva Zoppetto/vincenzopanicucci@poste.it/Corio/1.103.27.59
Dott. Ludovico Zola/edoardo00@libero.it/Bressa/109.201.79.63
Aldo Interminelli/ggiammusso@poste.it/Pile/48.47.113.247
Beatrice Sonnino/nadia16@tin.it/Barbania/63.103.137.231
Gastone Finotto/stoppaninicola@alice.it/Mergozzo/16.88.30.229
Elvira Faranda/boccionifabia@tin.it/Carlentini/255.17.216.3
Aurora Solimena/tagliafierrogeronimo@poste.it/Santa Chiara Di Nardo'/31.75.85.100
Vittoria Jovinelli/tcianciolo@libero.it/Revigliasco Torinese/190.178.155.197
Fulvio Troisi/teresalamborghini@poste.it/Breno/73.23.33.138
Alessio Bossi/donatellacasini@libero.it/Baragiano/38.188.134.12
Dott. Laureano Raimondi/ipersico@tin.it/Castelletto D'Orba/3.244.144.17
Sig. Rembrandt Montanari/amletorenier@poste.it/Bozen/106.245.162.42
Sig.ra Giulietta Barcaccia/lolita97@vodafone.it/Longobardi/198.135.81.184
Sig. Azeglio Olivetti/danilo04@libero.it/Cogollo/130.197.86.184
Sig. Orlando Cuda/cannizzarocorrado@poste.it/Gorzone/141.26.163.141
Panfilo Pontecorvo/bertoluccigianni@alice.it/Santa Maria Di Bobbio/78.7.34.249
Nicoletta Aulenti/adrianozanichelli@tim.it/Alice Superiore/122.238.27.250
Fabrizia Guidone/montesanocecilia@vodafone.it/Piani Di Borghetto/87.168.5.22
Azeglio Antonetti/romma@alice.it/Pontestazzemese/151.14.113.179
Massimo Pellico-Simeoni/zagurigianfranco@libero.it/Terrazze/99.23.218.248
Ranieri Piacentini/carolina70@poste.it/Sant'Andrea In Monte/15.1.102.208
Fabio Ginese/luxardoleone@vodafone.it/Fino Mornasco/50.72.129.123
Amedeo Sagese/croth@tin.it/Marengo/78.59.86.241
Nicola Tosto-Verga/guarino86@libero.it/Taisten/157.163.40.219
Ignazio Comisso/yburcardo@vodafone.it/Montebello Sul Sangro/210.48.204.100
Donatella Palazzo/ecrispi@vodafone.it/San Potito/113.26.109.81
Matilda Solari/neddagremese@tin.it/St. Peter in Ahrn/15.191.162.169
Serena Torlonia-Alighieri/germanaarmani@vodafone.it/Cerreto Sannita/47.219.225.21
Aldo Bignardi/romina55@fastwebnet.it/Ferrania/167.140.42.213
Bernardo Bertoni/pellicosole@libero.it/Villanova Del Sillaro/151.167.139.85
Sig.ra Nedda Montanari/pizzamanopriscilla@libero.it/Usago/180.249.186.165
Ennio Venturi/efantini@virgilio.it/Colli Di Suso/20.139.82.135
Jolanda Bosio/pompeo53@vodafone.it/Pflaurenz/68.110.32.70
Pellegrino Canevascini/massimiliano26@libero.it/Tesoriero/58.111.112.87
Dott. Hugo Crispi/tencallafrancesca@tim.it/Donori'/70.84.155.119
Sig.ra Adelasia Gussoni/francescodallape@fastwebnet.it/Avigliano Scalo/209.170.203.90
Sole Pepe/ninettadonati@vodafone.it/Siliqua/111.216.85.71
Dott. Livia Montanari/barbaranavarria@virgilio.it/Torre Del Pozzo/0.114.23.150
Ruggero Asprucci/hmigliaccio@tim.it/Borghetto Lodigiano/108.171.90.9
Paoletta Bruscantini/ybonino@tim.it/Onigo/24.195.176.225
Amico Spanevello/npelli@tim.it/Cavazzoli/183.120.195.179
Maria Campano/silvestro87@tin.it/Molochio/80.196.7.26
Antonella Emanuelli/ebeccheria@libero.it/Miazzina/77.89.13.88
Greca Amato/gianmarcofarinelli@alice.it/Villa Lagarina/244.242.248.55
Lilla Cociarelli-Roncalli/fpratesi@tin.it/Riola/58.177.200.209
Martino Sonnino/greco49@fastwebnet.it/Peschici/222.114.214.17
Leonardo Doria/ferdinandobuscetta@vodafone.it/Ponzano Monferrato/199.195.156.125
Alfredo Biagi/zoppettovincenzo@tin.it/Maissana/74.118.196.28
Orlando Carli-Gadda/lamborghinitatiana@vodafone.it/Stanghella/13.166.191.112
Orazio Basadonna/pomponio33@vodafone.it/Mezzocorona/237.194.191.168
Annunziata Porcellato/alberto72@poste.it/Bagnolo Di Lonigo/196.88.39.75
Ninetta Giradello/patriziaguinizzelli@tin.it/Vezzan/49.133.128.181
Giovanna Soffici/antonellodellucci@libero.it/Sant'Angelo In Villa/173.102.186.20
Matteo Boccaccio/nicolapetrucelli@virgilio.it/Aquileia/211.200.136.36
Telemaco Ruffini/mirco75@vodafone.it/Guidonia Montecelio/233.61.224.35
Cristina Benussi-Bottaro/tarmani@tim.it/Crema/175.40.216.161
Laureano Bottaro/wcremonesi@poste.it/Collemoresco/131.254.247.250
Alfredo Trapani/babatoadamo@libero.it/Portopalo Di Capo Passero/32.144.45.224
Sig.ra Tina Spallanzani/cutulidonatello@fastwebnet.it/Briano/195.167.231.206
Ennio Scandone/granatellimichele@tim.it/Cognola/13.196.2.248
Eva Ruberto/adelmogaluppi@virgilio.it/Premaor/255.231.238.80
Mariana Caruso/rpacomio@virgilio.it/Desenzano Al Serio/224.32.83.187
Alessandra Taliercio/mgolino@virgilio.it/Excenex/133.197.255.47
Filippa Balbo/paulina05@poste.it/Casalmaiocco/246.59.240.71
Diana Galtarossa/cirillo48@tin.it/Mondovi'/170.150.128.3
Bettina Giacometti/emma32@virgilio.it/Vitulazio/204.106.241.142
Gelsomina Garibaldi/sabatinoantonelli@alice.it/Riolo Terme/38.201.70.185
Vanessa Maspero/errigozaira@tim.it/Grignasco/129.76.238.62
Sig.ra Filippa Canali/valentina01@tin.it/Salcito/184.163.66.182
Annibale Golgi/xpacillo@virgilio.it/Vagli Sopra/242.171.129.43
Elena Poerio-Ruffini/vivaldimilo@vodafone.it/Calimera Calabra/31.226.236.197
Elladio Pacillo/satrianilivio@libero.it/Frasso Telesino/179.88.165.89
Pasquale Pelli/hrismondo@libero.it/Tanaunella/67.188.97.159
Dott. Elena Torricelli/nico11@virgilio.it/Lattughelle/93.136.156.69
Flavio Schiavone/martagiannotti@virgilio.it/Santa Maria Del Piave/0.92.206.226
Monica Gualtieri/peanosimonetta@libero.it/Santa Maria Lo Piano/119.231.217.92
Evangelista Bombieri/lorenzomilanesi@vodafone.it/Schlinig/23.125.115.200
Elvira Turati/otrezzini@tim.it/Mortizza/223.114.89.111
Adriana Bettin/xcainero@tim.it/Premosello Chiovenda/53.124.122.1
Fabrizia Caironi/vittorio20@tin.it/Monale/175.225.212.6
Ansaldo Vigliotti/benussimirco@fastwebnet.it/Ripa/101.208.55.99
Fiamma Gatto-Gagliardi/vanessasaragat@virgilio.it/Sirai/228.169.169.78
Dott. Ermes Paganini/casagrandealessio@tim.it/Lumignacco/68.187.4.133
Ludovica Villadicani-Balbo/loprestimichelangelo@fastwebnet.it/Montefiorino/30.52.160.236
Giorgio Prati/russomonica@fastwebnet.it/Azeglio/53.28.178.82
Michela Gagliardi/victoria14@libero.it/Capalbio Stazione/122.197.197.82
Silvia Curatoli/ggolgi@alice.it/Crocette/61.45.235.198
Raffaellino Mezzetta/vecolivittoria@fastwebnet.it/San Felice Sul Panaro/206.92.58.117
Antonello Pisani/wferrari@alice.it/Colombare Di Sirmione/228.95.63.117
Sophia Satriani-Monduzzi/crubbia@vodafone.it/Camini/133.229.37.220
Sante Cainero-Sonnino/federico98@vodafone.it/Fontana Fredda/11.92.145.87
Maurilio Carullo/aperini@tim.it/Windegg/19.105.15.203
Susanna Vespucci-Giradello/calgarielladio@alice.it/Villadoro/228.61.126.11
Marta Sgarbi/palomaferrara@fastwebnet.it/Pieve Di Compito/119.239.214.137
Dott. Marcantonio Brancaccio/amedeo28@alice.it/San Corrado Di Fuori/13.194.107.120
Loretta Gilardoni/galfieri@libero.it/San Zeno Di Montagna/253.86.48.118
Iolanda Vivaldi/umberto74@fastwebnet.it/Caselle In Pittari/182.126.155.235
Enzio Sordi/greca68@virgilio.it/Varna/211.78.113.51
Dott. Concetta Comisso/farinellimarcello@tim.it/Villa Bozza/49.165.138.218
Raffaella Cocci-Moccia/liguorinatalia@poste.it/San Martino Al Cimino/41.222.156.94
Vittoria Tencalla/bbonaventura@alice.it/Delia/254.241.85.225
Fulvio Ferrucci/achilleubaldi@alice.it/Roccalvecce/22.91.94.35
Dott. Massimo Correr/ebiagi@alice.it/Citta' Del Mare/90.122.224.117
Priscilla Gagliano/nino24@poste.it/Pratovecchio/138.59.221.227
Romana Bossi/navarriaottone@vodafone.it/Montemerlo/103.226.101.243
Tonino Vivaldi/ninetta96@tim.it/Caminata/116.216.115.118
Temistocle Lucarelli-Tanzini/giadastrangio@vodafone.it/Marina Di Savelletri/127.124.191.219
Mariana Palombi/vcontrafatto@tim.it/Casolla/105.58.158.218
Dott. Ferdinando Cannizzaro/delfino84@libero.it/Nesso/195.146.212.173
Fernanda Rienzo-Farina/federica75@tin.it/Milici/192.131.144.230
Dott. Renzo Guariento/alfio18@libero.it/Cana/110.65.181.148
Teresa Rossini/bverga@alice.it/Quiliano/160.204.171.58
Goffredo Bosio-Badoer/gian44@vodafone.it/Pignola/122.255.187.93
Dott. Claudia Romiti/gleopardi@libero.it/Sant'Arcangelo Di Magione/112.52.60.126
Ivo Milanesi/jacopoavogadro@virgilio.it/Vigolo Vattaro/177.124.206.20
Luca Rienzo-Verri/enniocaruso@poste.it/Ca' Gallo/189.193.173.64
Matteo Abatantuono/zmontecchi@vodafone.it/Fontanaluccia/38.131.45.9
Dott. Leonardo Martinelli/tgaltarossa@tim.it/Tossignano/218.142.182.163
Delfino Spallanzani/nannimaglio@fastwebnet.it/Vo'/115.228.225.100
Roman Nicoletti/anapolitano@virgilio.it/Villaggio Aldisio/141.140.190.149
Rossana Monti/nbonaventura@tin.it/Frassinelle Polesine/37.118.145.188
Arnaldo Panicucci/gcatalano@tim.it/Collina/243.56.162.34
Sig.ra Tatiana Onio/ubalotelli@virgilio.it/Brusatasso/218.213.136.92
Laura Camuccini/jolanda11@poste.it/Sant'Ilario D'Enza/55.48.93.70

Отработка 160 гигового лога под линуксом, параметры системы в скрине:

1727260593755.png
 
Последнее редактирование:
Залил обнову мультипроцессорной версии, поправил баг на генерации копий временных файлов в процессе слияния просидев в отладчике около 12 часов😰, что создавало много лишних копий в папке TEMP и на финальном процессе слияния в теории при размере на файле в 1 терабайт могло вызвать исключение на функции финального слияния в экспортируемый файл. Данная обнова по замерам на маленьком файле в 2 гига никак не повлияла на скорость, а вот на 200 гиговом логе в одинаковых условиях скорость выросла в полтора раза.

Результаты отработки 200гб лога до обновления:
1727371534649.png

Результаты после обновления:
1727371582815.png
 
Последнее редактирование:
Как программа объединит 7 блоков (что означает, что она создаст нечетное количество файлов TMP)? Например, функция "read_and_process_chunks_multiprocess" создаст 7 файлов TMP, тогда как будет действовать "batch_merge" в это время?
 
Последнее редактирование:
Как программа объединит 7 блоков (что означает, что она создаст нечетное количество файлов TMP)? Например, функция "read_and_process_chunks_multiprocess" создаст 7 файлов TMP, тогда как будет действовать "batch_merge" в это время?
Не понял вашего вопроса. По логике если останется 10 файлов или меньше в папке TEMP, должна отработать функция final_merge, и произойти финальное слияние в output.txt, если же файл остается всего 1, он переименовывается в выходной.
Посмотрите на этот кусок кода, и как Вы посмотрели хайд?

Python:
while len(temp_files) > batch_size:  # Пока временных файлов больше, чем размер пакета
        temp_files, unique_count, duplicate_count = batch_merge(temp_files, batch_size, temp_dir, num_merge_processes)  # Выполняем пакетное слияние
        total_unique_count += unique_count  # Обновляем общий счетчик уникальных строк
        total_duplicate_count += duplicate_count  # Обновляем общий счетчик дубликатов
 
Последнее редактирование:
Спасибо за предоставленный мне код
Очень надежный и информативный скрипт
Многопроцессорная обработка на моем железе добавляет около 30-40% к скорости в зависимости от настроек обработки по сравнению с однопоточным аналогом.

Я также захотел разобраться в коде и посмотреть, что можно оптимизировать.
Далее мнение дилетанта и некоторые вопросы по коду. Поскольку я не вижу хайд, я буду обсуждать код который у меня есть:

Начал я с функции обработки кусков:
Python:
def process_chunk_and_write_multiprocess(chunk, temp_dir):
    ...
    unique_items = set(chunk)
    sorted_chunk = sorted(unique_items)
    ...
Я не нашел никакого смысла в использовании set(chunk), поскольку за удаление дублей отвечает другой код. Если все таки избавиться от set(), то наиболее эффективно будет использовать chunk.sort() без переназначения и копирования, то есть без sorted_chunk = sorted(chunk).
Это снизит нагрузку на процессор и уменьшит расход оперативной памяти. Общая скорость скрипта не вырастет, т.к. на хорошем железе многопроцессорность легко компенсирует эту потерю эффективности.
Если же set() зачем то нужен, то следует переназначать chunk, вместо использования разных переменных, это позволит высвобождать оперативную память быстрее, поскольку перед выходом из функции идет задержка с записью в файл.

По удалению дублей также есть вопрос.
Функция слияния использует удаление дублей(merge_files_parallel) как для пакетного, так и для финального слияния. То есть, каждый раз когда мы сливаем пакетные файлы мы также удаляем дубли. Но достаточно сделать это один раз при финальном слиянии. Эта задержка также почти полностью компенсируется многопроцессорностью, но оптимизировать стоит.

Я добавил флаг final в параметры для функции удаления дублей и ограничил им функцию удаления дублей. Я также экспериментировал с алгоритмами для удаления дублей и остановился на лаконичном:
Python:
if final:
    for line, group in itertools.groupby(merged_iter):
        outfile.write(line)

На мой субъективный взгляд это работает капельку быстрее и выглядит лаконичнее. Но стоит заметить, groupby не использует strip() при сравнениях. Но поскольку мы уже применяли strip() при нарезке кусков, то сравнения будут считаться корректными.

Я пробовал добавить многопроцессорную обработку для финального слияния, используя threading.Lock(). И в процессхакере это выглядело очень даже хорошо с точки зрения использования I\O(не было снижений скорости или пауз как в однопотоке, скорость чтения отличная), но по итогу на файле в 30 гб без пакетных слияний это совсем не отразилось

Numpy также не смог поднять скорость скрипта, даже не смотря на фиксированные размеры массива и длину строк.

В итоге стало понятным, что скрипт можно сделать более эффективным с точки зрения использования ресурсов, но не намного быстрее.
Единственное, что значительно улучшило общую скорость - это не использование пакетного слияния без особой необходимости.
Поскольку ограничением для heapq.merge является только OS(512 и 2048 файловых дескрипторов для Windows x32 и x64), то повышение этой планки до этих лимитов повышает скорость скрипта на 25-40%.
А в случае обработки действительно больших словарей выгоднее увеличить размер куска в 10+ раз, чем допустить хоть одно слияние. Этого вполне хватит, чтобы обработать даже очень большие словари без слияния. Если позволит оперативная память, конечно.
 
Спасибо за предоставленный мне код
Очень надежный и информативный скрипт
Многопроцессорная обработка на моем железе добавляет около 30-40% к скорости в зависимости от настроек обработки по сравнению с однопоточным аналогом.

Я также захотел разобраться в коде и посмотреть, что можно оптимизировать.
Далее мнение дилетанта и некоторые вопросы по коду. Поскольку я не вижу хайд, я буду обсуждать код который у меня есть:

Начал я с функции обработки кусков:
Python:
def process_chunk_and_write_multiprocess(chunk, temp_dir):
    ...
    unique_items = set(chunk)
    sorted_chunk = sorted(unique_items)
    ...
Я не нашел никакого смысла в использовании set(chunk), поскольку за удаление дублей отвечает другой код. Если все таки избавиться от set(), то наиболее эффективно будет использовать chunk.sort() без переназначения и копирования, то есть без sorted_chunk = sorted(chunk).
Это снизит нагрузку на процессор и уменьшит расход оперативной памяти. Общая скорость скрипта не вырастет, т.к. на хорошем железе многопроцессорность легко компенсирует эту потерю эффективности.
Если же set() зачем то нужен, то следует переназначать chunk, вместо использования разных переменных, это позволит высвобождать оперативную память быстрее, поскольку перед выходом из функции идет задержка с записью в файл.

По удалению дублей также есть вопрос.
Функция слияния использует удаление дублей(merge_files_parallel) как для пакетного, так и для финального слияния. То есть, каждый раз когда мы сливаем пакетные файлы мы также удаляем дубли. Но достаточно сделать это один раз при финальном слиянии. Эта задержка также почти полностью компенсируется многопроцессорностью, но оптимизировать стоит.

Я добавил флаг final в параметры для функции удаления дублей и ограничил им функцию удаления дублей. Я также экспериментировал с алгоритмами для удаления дублей и остановился на лаконичном:
Python:
if final:
    for line, group in itertools.groupby(merged_iter):
        outfile.write(line)

На мой субъективный взгляд это работает капельку быстрее и выглядит лаконичнее. Но стоит заметить, groupby не использует strip() при сравнениях. Но поскольку мы уже применяли strip() при нарезке кусков, то сравнения будут считаться корректными.

Я пробовал добавить многопроцессорную обработку для финального слияния, используя threading.Lock(). И в процессхакере это выглядело очень даже хорошо с точки зрения использования I\O(не было снижений скорости или пауз как в однопотоке, скорость чтения отличная), но по итогу на файле в 30 гб без пакетных слияний это совсем не отразилось

Numpy также не смог поднять скорость скрипта, даже не смотря на фиксированные размеры массива и длину строк.

В итоге стало понятным, что скрипт можно сделать более эффективным с точки зрения использования ресурсов, но не намного быстрее.
Единственное, что значительно улучшило общую скорость - это не использование пакетного слияния без особой необходимости.
Поскольку ограничением для heapq.merge является только OS(512 и 2048 файловых дескрипторов для Windows x32 и x64), то повышение этой планки до этих лимитов повышает скорость скрипта на 25-40%.
А в случае обработки действительно больших словарей выгоднее увеличить размер куска в 10+ раз, чем допустить хоть одно слияние. Этого вполне хватит, чтобы обработать даже очень большие словари без слияния. Если позволит оперативная память, конечно.
Это все конечно хорошо, но покажи исходник. Set на этапе обработки чанков делает удаление, посмотри про уникальность множеств в set(). Ограничение по файловым дескрипторам можно обходить в системе. При увеличении размера чанка скорость может как падать так и повышаться, я уже писал об этом. Все зависит от железки. Выбрал наиболее оптимальный вариант в 2 миллиона строк на чанк. ИМХО единственное что сможет поднять скорость скрипта, это обработка на GPU. Если есть время, попробуй код перевести на GPU:) У меня есть такая идея, но сейчас загружен по работе:(
Спасибо что попытался вникнуть и что-то улучшить. :)
 
Последнее редактирование:
Это все конечно хорошо, но покажи исходник. Set на этапе обработки чанков делает удаление, посмотри про уникальность множеств в set().
Я это понимаю, но зачем нужно удаление на уровне чанков? Ведь если хоть один дупликат не будет находиться в пределах одного куска, то set() полностью теряет смысл
Python:
def process_chunk_and_write_multiprocess(chunk, temp_dir):
    try:
        logger.info(f"Начало обработки чанка размером {len(chunk)} строк (процесс)")
        heavy_computation(10000000)  # Симуляция тяжелых вычислений
        chunk.sort()
        unique_filename = os.path.join(temp_dir, f"tempfile_{uuid.uuid4()}.tmp")  # Генерируем уникальное имя файла
        with open(unique_filename, 'w', encoding='utf-8', errors='replace') as file:  # Сохранение в файл
            for line in chunk:
                file.write(line)
        return unique_filename
    except Exception as e:
        logger.error(f"Ошибка при обработке чанка: {e}")
        return None
Вместо этого мы полностью убираем дупликаты при финальном слиянии, не зависимо от их местоположения в оригинальном файле:
Python:
def merge_files_parallel(temp_files, output_file,  chunk_size=2000000, num_threads=24, final=False):
    try:
        logger.info(f"Параллельное слияние {len(temp_files)} временных файлов с использованием {num_threads} потоков")
        unique_count = 0
        duplicate_count = 0

        with ExitStack() as stack, open(output_file, 'a', encoding='utf-8', errors='replace') as outfile:
            file_iters = [stack.enter_context(open(temp_file, "r", encoding='utf-8', errors='replace')) for temp_file in temp_files]
            merged_iter = heapq.merge(*file_iters)
            if final:
                for line, group in groupby(merged_iter):
                    outfile.write(line)
                    unique_count+=1
                    #duplicate_count+=len(list(group))-1 #при желании раскомментировать
            else:
                for line in merged_iter:
                    outfile.write(line)
                
        logger.info(f"Слияние завершено. Уникальных строк: {unique_count}, дублей удалено: {duplicate_count}")
        return unique_count, duplicate_count, output_file

    except Exception as e:
        logger.error(f"Ошибка при параллельном слиянии файлов: {e}")
        return 0, 0
 
Я это понимаю, но зачем нужно удаление на уровне чанков? Ведь если хоть один дупликат не будет находиться в пределах одного куска, то set() полностью теряет смысл
Потому-что на границах слияния файлов могут быть погрешности, и чистку нужно делать на 3х этапах работы алгоритма (независимо будут ли дубли на этапе создания чанков или нет). Формирование чанков, пакетное слияние, финальное слияние. В другом случае по-моему возникали погрешности при тестах на крупных файлах на размерах от 100 гигабайт. (Да я вспомнил, с antikrya когда тестировали, он жаловался что дубли чистятся не все в его 200 гиговом файле, мне пришлось придумать такое решение).

P.S. Если есть форк который работает лучше моего решения, и оттестирован с образцами от 100 до 300 гигабайт, сбрасывай в эту ветку. Я думаю тебе будут тут многие благодарны. =) Версию тебе в ПМ сбрасывал идентично той что под хайдом. У меня же сейчас вообще нет возможностей как по времени так и финансово тестировать такие объемы и что-то улучшать, ибо сервак который я покупал для тестов в райде в 2 терабайта для этого алгоритма истек недели 2 назад. А платить 80 баксов в месяц накладно.
 
Последнее редактирование:
Потому-что на границах слияния файлов могут быть погрешности, и чистку нужно делать на 3х этапах работы алгоритма (независимо будут ли дубли на этапе создания чанков или нет). Формирование чанков, пакетное слияние, финальное слияние. В другом случае по-моему возникали погрешности при тестах на крупных файлах на размерах от 100 гигабайт. (Да я вспомнил, с antikrya когда тестировали, он жаловался что дубли чистятся не все в его 200 гиговом файле, мне пришлось придумать такое решение)
На сколько я могу судить, погрешность может возникать, только если нарушена сортировка, либо из-за ошибки кодировки, невидимых спецсимволов и тд, потому что финальный алгоритм прост и надежен. Если ошибается он, значит что-то не в порядке. Но heapq.merge тоже ориентируется на отсортированные куски. Он не должен ошибаться, потому что нет какой либо гонки и тд.
Я сейчас смотрю на оригинальный код и вижу:
Python:
def process_chunk_and_write_multiprocess(chunk, temp_dir):
        '''
        with open(unique_filename, 'w', encoding='utf-8') as temp_file:
            temp_file.write("\n".join(sorted_chunk) + "\n")
        '''
Думаю, сортировка нарушается из-за лишней "\n". Это как раз граница куска, как вы и говорили
 
На сколько я могу судить, погрешность может возникать, только если нарушена сортировка, либо из-за ошибки кодировки, невидимых спецсимволов и тд, потому что финальный алгоритм прост и надежен. Если ошибается он, значит что-то не в порядке. Но heapq.merge тоже ориентируется на отсортированные куски. Он не должен ошибаться, потому что нет какой либо гонки и тд.
Я сейчас смотрю на оригинальный код и вижу:
Python:
def process_chunk_and_write_multiprocess(chunk, temp_dir):
        '''
        with open(unique_filename, 'w', encoding='utf-8') as temp_file:
            temp_file.write("\n".join(sorted_chunk) + "\n")
        '''
Думаю, сортировка нарушается из-за лишней "\n". Это как раз граница куска, как вы и говорили
Ну поэксперементируйте (в чужих логах мне копаться права не давали во время тестов (но было много ошибок при их обработке), хотя я это и предполагал, поэтому сделал более универсальный вариант который жрет логи и с мусором и образцово-сгенерированные рандомным генератором). Еще просьба попробовать как подметил Guron_18 мне в ПМ обрабатывать строки вместо:
Python:
open(f, 'r', encoding='utf-8', errors='replace')

Вот так в бинарном режиме:
Python:
open(file=file, mode='r+b')

Думаю, сортировка нарушается из-за лишней "\n". Это как раз граница куска, как вы и говорили
Не должно, это было сделано для того чтобы в финале после создания файла в конце всегда был переход каретки. =\

P.S. Кстати тут подсказывали использовать фильтр Блума, я пробовал. Скорость была медленней процентов на 10-20, а погрешность выше....поэтому не стал публиковать.
 
Последнее редактирование:
Ну поэксперементируйте (в чужих логах мне копаться права не давали во время тестов (но было много ошибок при их обработке), хотя я это и предполагал, поэтому сделал более универсальный вариант который жрет логи и с мусором и образцово-сгенерированные рандомным генератором). Еще просьба попробовать как подметил Guron_18 мне в ПМ обрабатывать строки вместо:
Python:
open(f, 'r', encoding='utf-8', errors='replace')

Вот так побайтово:
Python:
open(file=file, mode='r+b')
Я протестировал на файле в 3 гб. С лишним пробелом или без, каждый прогон давал разные результаты на одном и том же файле. Разница невелика, в 4-37 файлов, но она ведь есть и это странно
Без пакетной сортировки он находит на 100-300 дублей больше, но все равно каждый раз по разному.
Мда, мой скрипт оказывается страдает тем же, а его путь более отслеживаемый и я не понимаю, где может быть проблема
Также заметил, что set() иногда убирает дубли, которые основной алгоритм пропускает. Но при этом сам set() отсеивает еще меньше в одиночку.
Возможно проблема в кодировке файла, там местами присутствуют кракозябры.
Это может быть интересным, open(file=file, mode='r+b'), потому что чтение пойдет быстрее и нужно будет лишь придумать алгоритм для расшифровки строк, либо только для итерации, чтобы обрабатывать хекс, если это будет так работать. Также стоит рассмотреть cchardet или charset-normalizer, чтобы динамически определять кодировку файла. Может, ansi строки будут читаться и записываться быстрее, обычно этого достаточно.
 
Я протестировал на файле в 3 гб. С лишним пробелом или без, каждый прогон давал разные результаты на одном и том же файле. Разница невелика, в 4-37 файлов, но она ведь есть и это странно
Без пакетной сортировки он находит на 100-300 дублей больше, но все равно каждый раз по разному.
Мда, мой скрипт оказывается страдает тем же, а его путь более отслеживаемый и я не понимаю, где может быть проблема
Также заметил, что set() иногда убирает дубли, которые основной алгоритм пропускает. Но при этом сам set() отсеивает еще меньше в одиночку.
Возможно проблема в кодировке файла, там местами присутствуют кракозябры.
Это может быть интересным, open(file=file, mode='r+b'), потому что чтение пойдет быстрее и нужно будет лишь придумать алгоритм для расшифровки строк, либо только для итерации, чтобы обрабатывать хекс, если это будет так работать. Также стоит рассмотреть cchardet или charset-normalizer, чтобы динамически определять кодировку файла. Может, ansi строки будут читаться и записываться быстрее, обычно этого достаточно.
Вам нужно сделать образцовый файл, который обработан в памяти тем же сетом например и уже по нему сверяться применяя свои изменения, если расхождения например есть хотя-бы в 1 байт на выходе от образцового, значит алгоритм работает неправильно.=) Я же это протестировал на размерах 1, 100, 300 гигабайт (большие файлы генерировал рандомными генераторами и мерджил в них дубли по рандомным участкам). Все было ок. Также при повторных сканированиях исходящих файлов всегда должно быть 0 дублей по счетчику, если они есть, значит что-то отработало неверно при первой обработке. По мне лучше сосредоточится на GPU вычислениях, ну если найдете способ который работает быстрее и без погрешностей, будет круто, я же остановился на том варианте который под хайдом, и натыкался на те-же проблемы с которыми столкнулись Вы. =)

P.S. Еще можете попробовать пофиксить копию алгоритма на C#, там есть баги по счетчикам расчета дублей (хотя сам скрипт по образцам отрабатывает идеально) которые мне лень пока исправлять, но также на этом языке можно попробовать его ускорить (хайд снял, по мне доработать эту версию перспективнее так как в шарпе нет питоновского GIL и не нужно заморачиваться со всякими мультипроцессами для его обхода): https://xss.pro/threads/123341/
 
Последнее редактирование:
Очень хорошая идея, а главное полезное, думаю удобно было бы использовать инструмент, чтобы сортировать поддомены в файлах(например у гарварда их больше 1000)
 
Видел что эти исходники раскопированы на некоторых "кидальных" форумах, так что смысла как я понял в этих хайдах нет при преме за 120$ дающий право смотреть их. (Если только в небольших, чтобы мотивировать новичков писать что-то полезное). Хайды сняты.
 
Последнее редактирование:
Поиграл с кодом
- Добавил на выбор бинарный режим. Это оказалось очень хорошей идеей. В среднем, скорость выросла на 40% при нарезке кусков. При финальном слиянии эффект менее заметен, там узкое место - это входящий поток

- Добавил передачу кусков в процессы через multiprocessing.JoinableQueue, это повысило скорость, а также почти вдвое сэкономило оперативную память. (тот же Queue, но с отслеживанием выполнения задач, потому что бинарное чтение настолько быстрое, что может крашнуть синхронизацию)

- Процессы больше не закрываются, а получают куски в цикле, что тоже экономит время на их инициализацию.

- Добавил определение кодировки. Может, пригодится где.

- Добавил удаление дублей сразу из нескольких файлов (Частенько надо именно это)

- Добавил финальное слияние в несколько файлов (для скармливания программам по частям, иногда надо)

- Добавил на выбор режим многопроцессорного финального слияния. По итогу однопоток все равно быстрее, но может пригодиться при разбивке на несколько файлов

В целом, благодаря этому получилась очень достойная скорость. На моем железе файл в 32 гб обрабатывается за 15 минут. Этот же файл при обработке оригинальным скриптом с дефолтными настройками(с множеством пакетных слияний) обрабатывался 55 минут и около 36 минут без пакетного слияния.

Минусы:
- Выбор режимов несколько усложнил и запутал код
- Для скорости проектирования параметры были сделаны глобальными переменными, но так делать неправильно, особенно с многопоточностью.
- Бинарный режим читает байты, конвертирует их в строки, а потом опять в байты, записывает, читает построчно, отсеивает дубликаты, конвертирует в байты и записывает... Из-за этого происходит чехарда с кодированием и декодированием, весом файла и тд. Конечное количество уникальных строк плюс-минус сходится, но общее количество прочитанных строк и особенно вес файла - нет. Это вызывает некоторые опасения, но в целом я склонен доверять логике скрипта.
- На разном железе стоит поиграться с настройками, в зависимости от мощности процессора и оперативной памяти, размера файла и тд.
- Пакетное слияние мало протестировано
- Основной целью скрипта является повышение скорости, а не вылавливание 100% дублей. Поэтому этот этап был плохо протестирован. Хочется думать, результат приближен к 99,9%


Постарался описать настройки интуитивно понятным языком
Если коротко, то идея такая:
Можно указать 1 файл в input_file или несколько в more_input_files через []
По умолчанию, нужно создать папку INPUT и поместить файлы там, они подтянутся в more_input_files
Есть два режима чтения-записи и два режима финального слияния.
По умолчанию выбраны оптимальные значения(для моего железа, потребностей и вкуса)

Python:
import os
import time
import uuid
import heapq
import shutil
import logging
from contextlib import ExitStack
from collections import namedtuple
from colorlog import ColoredFormatter
from charset_normalizer import from_bytes
from multiprocessing import Lock, Value, Process, JoinableQueue
from itertools import islice, groupby, cycle
from concurrent.futures import ProcessPoolExecutor, as_completed


def get_unique_filename():  # Функция для генерации уникального имени файла
    return os.path.join(temp_dir, f"temp_file_{uuid.uuid4().hex}.tmp")


def get_files_path(file_path):  # Функция для получения списка всех файлов во временной папке
    return [os.path.join(file_path, f) for f in os.listdir(file_path) if os.path.isfile(os.path.join(file_path, f))]


def generate_file_paths(file_path, count): # функция для генерации имен выходного файла
    base_name, ext = os.path.splitext(file_path)
    directory = os.getcwd() if not os.path.isabs(file_path) else os.path.dirname(file_path)
    if more_output_files > 1:
        return [os.path.join(f"{directory}\\{output_file_dir}", f"{base_name}_{i + 1}{ext}") for i in range(count)]
    else:
        return [os.path.join(f"{directory}\\{output_file_dir}", f"{base_name}{ext}")]


def worker_final(chunk_queue, files, unique_count, locked): # Функция многопоточного финального слияния
    try:
        outfile_cycle = cycle(files)
        while True:
            chunk = chunk_queue.get()
            if chunk is None:
                break
            chunk = ''.join(line for line, _ in groupby(chunk))
            unique_count.value += chunk.count('\n')
            with locked:
                current_file = next(outfile_cycle)
                with open(current_file, mode.write_mode, encoding=mode.local_encoding, errors=mode.error) as outfile:
                    if bin_mode:
                        outfile.write(chunk.encode(mode.global_encoding))
                    else:
                        outfile.write(chunk)
    except Exception as e:
        logger.error(f"Ошибка при параллельной обработке финальных кусков {e}")


def parallel_final_merge(merged_iter): # функция для запуска многопоточного финального слияния
    processes = []
    chunk_queue = JoinableQueue()
    for _ in range(num_processes):
        p = Process(target=worker_final, args=(chunk_queue, generate_file_paths(output_file, more_output_files) ,total_unique_count, lock,))
        p.start()
        processes.append(p)
    while True:
        chunk = list(islice(merged_iter, chunk_size))
        if not chunk:
            for _ in range(num_processes):
                chunk_queue.put(None)
            break
        if chunk_queue.qsize():
            chunk_queue.join()
        chunk_queue.put(chunk)
    for p in processes:
        p.join()

def standard_final_merge(merged_iter): # функция для однопоточного финального слияния
    with ExitStack() as stack:
        files_cycle = cycle(
            [stack.enter_context(open(final_file, mode.write_mode, encoding=mode.local_encoding, errors=mode.error))
             for final_file in generate_file_paths(output_file, more_output_files)])
        while True:
            chunk = ''.join(islice((line for line, _ in groupby(merged_iter)), chunk_size))
            if not chunk:
                break
            total_unique_count.value += chunk.count('\n')
            outfile = next(files_cycle)
            if bin_mode:
                outfile.write(chunk.encode(mode.global_encoding))
            else:
                outfile.write(chunk)


def batch_merge(merged_iter, outfile_name): # функция для многопоточного пакетного слияния
    with open(outfile_name, 'w', encoding=mode.global_encoding, errors='replace') as outfile:
        outfile.writelines(line for line in merged_iter)


def main_merge(temp_files, final=False): # функция для запуска слияния
    try:
        logger.info(f"Запущено слияние {len(temp_files)} временных файлов")
        with ExitStack() as stack:
            file_iters = [stack.enter_context(open(temp_file, "r", encoding=mode.global_encoding, errors='replace')) for temp_file
                          in temp_files]
            merged_iter = heapq.merge(*file_iters)
            if final:
                if final_thread_mode:
                    parallel_final_merge(merged_iter)
                else:
                    standard_final_merge(merged_iter)
            else:
                batch_merge(merged_iter, get_unique_filename())
    except Exception as e:
        logger.error(f"Ошибка слияния файлов: {e}")


def batch_threads_merge(): # функция для запуска многопоточного пакетного слияния
    try:
        with ProcessPoolExecutor(max_workers=num_processes) as executor:
            futures = []
            temp_files = get_files_path(temp_dir)
            for i in range(0, len(temp_files), batch_size):
                batch = temp_files[i:i + batch_size]
                futures.append(executor.submit(main_merge, batch, False))
            for _ in as_completed(futures):
                logger.info(f"Пакет собран!")

        for temp_file in temp_files:
            logger.info(f"Удаление временного файла: {temp_file}")
            os.remove(temp_file)

    except Exception as e:
        logger.error(f"Ошибка пакетного слияния файлов: {e}")
        return temp_files


def worker_cut_bin(chunk_queue, orig_count): # функция многопоточной нарезки кусков в бинарном режиме
    try:
        while True:
            chunk = chunk_queue.get()
            chunk_queue.task_done()
            if chunk is None:
                break
            chunk = chunk.decode(mode.global_encoding).split('\n')
            logger.info(f"Начало обработки куска размером {len(chunk)} строк (процесс)")
            orig_count.value += len(chunk)
            chunk.sort()
            chunk = ''.join(f"{line.strip()}\n" for line in chunk).encode(mode.global_encoding)
            with open(get_unique_filename(), 'wb') as file:
                file.write(chunk)

    except Exception as e:
        logger.error(f"Ошибка при обработке куска: {e}")

def worker_cut(chunk_queue, orig_count): # функция многопоточной нарезки кусков в построчном режиме
    try:
        while True:
            chunk = chunk_queue.get()
            chunk_queue.task_done()
            if chunk is None:
                break
            logger.info(f"Начало обработки куска размером {len(chunk)} строк (процесс)")
            orig_count.value += len(chunk)
            chunk.sort()
            chunk = ''.join(f"{line}\n" for line in chunk)
            with open(get_unique_filename(), 'w', encoding=mode.global_encoding, errors='replace') as file:
                file.write(chunk)
    except Exception as e:
        logger.error(f"Ошибка при обработке куска: {e}")


def cut_chunks(file_path):  #функция для запуска многопоточной нарезки кусков
    with open(file_path, mode.read_mode, encoding=mode.local_encoding, errors=mode.error) as infile:
        processes = []
        chunk_queue = JoinableQueue()
        for _ in range(num_processes):
            process = Process(target=mode.worker, args=(chunk_queue, total_orig_count))
            process.start()
            processes.append(process)
        while True:
            chunk = mode.cut_read_func(infile)
            if not chunk:
                for _ in range(num_processes):
                    chunk_queue.put(None)
                break
            if chunk_queue.qsize():
                chunk_queue.join()
            chunk_queue.put(chunk)

        for process in processes:
            process.join()

def read_text_chunk(infile):
    return [line.strip() for line in islice(infile, chunk_size)]

def read_binary_chunk(infile):  # читаем файл в бинарном режиме
    chunk = infile.read(chunk_size * byte_coefficient)
    if not chunk:
        return None
    if chunk[-1] != b"\n":  # если есть недочитанная строка
        chunk += last_line(infile)  # дочитываем ее
    return chunk


def last_line(infile):  # функция для дочитывания последней строки
    byte_buffer = b""
    while True:
        next_byte = infile.read(1)
        if not next_byte or next_byte == b"\n":
            return byte_buffer + next_byte
        byte_buffer += next_byte



def create_service_dir(path):
    os.makedirs(os.path.join(os.getcwd(), path), exist_ok=True)


def clean_tempdir():
    try:
        shutil.rmtree(temp_dir)
    except Exception as e:
        logger.error(f"Ошибка при удалении временных файлов: {e}")


def get_mode(): # сервисная функция для подстройки под режим
    read_mode = 'rb' if bin_mode else 'r'
    write_mode = 'ab' if final_thread_mode and bin_mode else 'wb' if bin_mode else 'a' if final_thread_mode else 'w'
    global_encoding = get_encoding()
    local_encoding = None if bin_mode else global_encoding
    error = None if bin_mode else 'replace'
    cut_read = read_binary_chunk if bin_mode else read_text_chunk
    worker = worker_cut_bin if bin_mode else worker_cut

    return FileMode(read_mode=read_mode, write_mode=write_mode, global_encoding=global_encoding, local_encoding=local_encoding, error=error, cut_read_func=cut_read, worker=worker)

def get_encoding_from_file(file):  # Функция определения кодировки
    with open(file, 'rb') as f:
        file_bytes = f.read(1000)
    result = from_bytes(file_bytes).best()
    if result:
        return result.encoding
    else:
        return 'utf-8'

def get_encoding():
    temp_set = set()
    for file_path in more_input_files:
        temp_set.add(get_encoding_from_file(file_path))
    if len(temp_set) == 1:
        return temp_set.pop()
    else:
        return 'utf-8'

def get_logger():
    formatter = ColoredFormatter(
        "%(log_color)s%(asctime)s - %(levelname)s - %(message)s",
        log_colors={'DEBUG': 'cyan', 'INFO': 'green', 'WARNING': 'yellow', 'ERROR': 'red', 'CRITICAL': 'bold_red'}
    )
    handler = logging.StreamHandler()
    handler.setFormatter(formatter)
    log = logging.getLogger()
    log.addHandler(handler)
    log.setLevel(logging.INFO)
    return log

def main():
    try:
        tic = time.perf_counter()
        logger.info(f"Создаем папки...")
        create_service_dir(temp_dir)
        create_service_dir(output_file_dir)

        logger.info(f"Начата нарезка кусков...")
        if input_file:
            more_input_files.append(input_file) # если указан input_file, добавляем его в список,

        for file_path in more_input_files: # перебираем входные файлы
            cut_chunks(file_path)  #и нарезаем их на куски

        while len(get_files_path(temp_dir)) > max_batch:  # если кусков больше, чем max_batch(2000 для windows x64), то сливаем пакеты по batch_size
            logger.info(f"Начато пакетное слияние временных файлов...")
            batch_threads_merge()  # то запускаем многопоточное пакетное слияние
        logger.info(f"Начато финальное слияние...")
        main_merge(get_files_path(temp_dir), True)

        tac = time.perf_counter()
        logger.info(
            f"\n\n\n\t\t###########################\n\t\t##Готово, вы великолепны!##\n\t\t###########################\n\n"
            f"\t\tВсего строк:\t{total_orig_count.value}\n\t\tУникальных:\t{total_unique_count.value}"
            f"\n\t\tДубликатов:\t{total_orig_count.value-total_unique_count.value}\n\t\t"
            f"Общее время:\t{tac - tic:.2f} сек.\n\n\n")  # подводим итоги
    except Exception as e:
        logger.error(f"Ошибка в главной функции: : {e}")
    finally:
        logger.info(f"Удаление временных файлов в {temp_dir}...")
        clean_tempdir()  # подтираем за собой





input_file = '' # указываем входной файл, либо используем more_input_files
bin_mode = True # включаем бинарный режим
final_thread_mode = False # многопроцессорное финальное слияние

output_file = "test_out.txt" # конечный файл
output_file_dir = "OUTPUT" # будет в этой папке

input_file_dir = "INPUT" # папка для входных файлов
more_input_files = get_files_path(input_file_dir) # берем все файлы из input_file_dir, можно заменить на [ваши файлы], либо использовать input_file
more_output_files = 1 # разбить финальное слияние на N файлов

chunk_size = 2000000 # максимум строк в куске
num_processes = 10 # количество процессов
max_batch = 2000 # максимум кусов без пакетного слияния
batch_size = max_batch // num_processes # максимум кусков в пакете
byte_coefficient = 15 # коэффициент умножения chunk_size на byte_coefficient, считаем что 1 строка состоит из 15 байт. В зависимости от специфики входного файла можно увеличить или уменьшить
total_orig_count = Value('i', 0) # считаем общее количество строк
total_unique_count = Value('i', 0) # считаем уникальные строки
temp_dir = "TEMP" # название папки для временных файлов
lock = Lock() # устанавливаем Lock
logger = get_logger() # устанавливаем логирование
FileMode = namedtuple('FileMode', ['read_mode', 'write_mode', 'global_encoding', 'local_encoding', 'error', 'cut_read_func', 'worker']) # переменные для выбора режимов
mode = get_mode() # подгоняем переменные под режим

if __name__ == "__main__":
    main()
 
Последнее редактирование:
искал буквально на неделе и вас нашел вот
https://nohide[.]space/threads/luch...raboty-s-tekstom-text-utils-pack-by-lays.250/

самый шикарный софт был в свое время, но как я понял сейчас всю дорогу зараженный...
мб у кого то есть такой чистенький?

а так же предлагаю тсу спрятать софт в пкм и будет просто пушка!
или может развить софт до такого же уровня как у Lays?

за удобство люди будут готовы платить деньги

сколько людей качало этот софт by Lays не счесть. прям народный
*может быть что удаление дублей там работало не оч
 
Последнее редактирование:


Напишите ответ...
  • Вставить:
Прикрепить файлы
Верх