- Автор темы
- Добавить закладку
- #21
Этот софт работает по временным файлам?Мне очень понравилось. Пользуюсь уже давно. Причём работает без сортировки.
И параметры самому можно выставить.
Этот софт работает по временным файлам?Мне очень понравилось. Пользуюсь уже давно. Причём работает без сортировки.
И параметры самому можно выставить.
Скорее всего.Этот софт работает по временным файлам?
В мультипроцессорной версии поправил обнаруженные баги, оптимизировал код, за счет этого чуть-чуть увеличилась скорость обработки алгоритма.Количество строк уникальных неправильно рассчитано.=)
Dott. Teresa Novaro/sanguinetisimonetta@tin.it/Bocenago/50.34.246.179
Dott. Paoletta Parpinel/xmorabito@virgilio.it/Giuliano Di Lecce/153.160.84.95
Michelotto Zito/calgariamalia@fastwebnet.it/Quattro Castella/98.12.70.87
Ermes Vattimo-Bompiani/gaspare51@poste.it/Salaparuta/155.233.251.224
Girolamo Prodi/giuliettaudinesi@virgilio.it/Solarino/239.140.189.253
Cassandra Rosiello/flaianovincenzo@alice.it/San Giacomo Di Spoleto/10.215.32.156
Vanessa Cesaroni/carmelo32@alice.it/Roraipiccolo/227.224.170.181
Maura Mercadante/conteludovico@tin.it/San Giorgio Della Richinvelda/112.61.44.170
Giuliana Rossetti/damianogonzaga@vodafone.it/Is Murdegus/156.253.48.96
Leonardo Schiavone/silvio63@virgilio.it/Piazza Del Galdo/68.42.159.62
Donatello Polesel/marta44@poste.it/Vezza D'Oglio/135.157.40.240
Angelo Chigi/napoleonecomeriato@virgilio.it/Budrie/77.28.147.237
Liliana Guidotti/ynicolini@fastwebnet.it/San Giacomo Roncole/108.69.232.28
Aurora Cammarata/nosigliasergius@poste.it/Ischia Ponte/252.125.118.59
Licia Colletti/xtamborini@alice.it/Garbagnate Monastero/163.149.79.234
Gian Tutino/qcasaleggio@vodafone.it/Denice/159.152.52.28
Stella Tosi-Serraglio/halfonsi@virgilio.it/Gazzada Schianno/177.33.104.102
Fernanda Staglieno/paulinaschiaparelli@tin.it/Nasidi/122.54.56.12
Sig.ra Milena Saragat/ugo85@poste.it/Torri Del Benaco/133.24.49.250
Dott. Milena Briccialdi/marcantonio17@libero.it/Casatenovo/174.111.142.188
Flora Barozzi/yricciardi@vodafone.it/Montegiordano Marina/26.74.230.105
Claudia Ferraris-Verdone/dpisani@virgilio.it/Figline Di Prato/234.237.0.201
Luigina Pizzo-Bossi/fmontanari@libero.it/Canonica D'Adda/10.57.145.77
Marta Sgarbi-Brunello/anitaalighieri@fastwebnet.it/Sciconi/109.155.223.64
Pierina Trincavelli-Ferraris/auroratrapani@tim.it/Corfinio/37.221.146.217
Romina Ossani/mazzacuratipina@vodafone.it/Nicosia/129.127.53.129
Pierina Deledda/baccio68@tin.it/Vighignolo/79.179.200.241
Roberto Munari-Solari/stefanoalighieri@vodafone.it/Serbadone/132.52.83.120
Licia Collodi/salandramaurilio@tin.it/Sutri/97.198.115.250
Germana Vespucci/guidomercadante@tin.it/Casinalbo/85.22.23.93
Romana Columbo/stefaniabriccialdi@fastwebnet.it/Castelbellino/89.238.204.173
Dott. Dolores Battisti/hpertile@libero.it/Canalicchio/173.190.108.109
Raffaella Biagiotti/vanessamazzanti@tim.it/San Nicandro Garganico/145.220.205.56
Azeglio Endrizzi/oariasso@tin.it/Niscemi/83.111.120.46
Etta Donati/cardanogustavo@poste.it/Budrione/90.109.207.243
Silvio Camuccini/virgilio41@libero.it/Pedalino/45.196.75.29
Danilo Campise/cilibrasierika@alice.it/Arcisate/211.187.249.215
Dott. Bianca Mennea/mduodo@virgilio.it/Occimiano/29.102.219.171
Raffaello Tamburini/manacordafabrizia@virgilio.it/Casumaro/8.95.40.13
Massimiliano Guglielmi/giannatorricelli@poste.it/La Muddizza/208.216.64.5
Danilo Almagià/ovalentino@tin.it/San Nicola/168.27.112.98
Ezio Gibilisco-Vigorelli/graziella45@alice.it/Carro/208.64.122.168
Ida Ruberto/ealfieri@libero.it/Cerreto Castello/236.232.212.218
Adele Montecchi/sabatino98@virgilio.it/Montabone/36.138.27.131
Paola Ferragamo-Gozzi/buscettagiulia@fastwebnet.it/Viadanica/133.199.141.213
Sylvia Lattuada-Bonolis/gorigreco@tim.it/San Vito Al Torre/204.218.9.34
Dott. Ivo Corradi/volterrastefania@libero.it/Bellizzi Irpino/173.71.237.187
Sig. Alessio Maggioli/giacobbecabibbo@tin.it/Clivio/21.159.179.146
Ernesto Altera/puccimaurilio@alice.it/Carditello/128.25.241.155
Patrizio Micheletti/kpriuli@alice.it/Tegoleto/64.145.145.15
Gian Traversa/marialeonardi@tin.it/Porto Valtravaglia/108.244.255.37
Flavio Serao/landolancisi@tim.it/Matierno/128.126.213.41
Sig.ra Martina Cugia/lazzaromantegna@alice.it/Belmonte Calabro Marina/100.111.229.255
Claudia Gottardi/isabellaluria@fastwebnet.it/Cusio/130.128.198.6
Telemaco Miniati/mpometta@libero.it/Apricale/202.239.109.246
Isa Salgari/gianmarcogermano@libero.it/San Patrizio/119.244.95.106
Durante Morandi-Beffa/liviabelletini@vodafone.it/Bernalda/64.4.198.200
Gianluca Branciforte-Traversa/pomponio23@vodafone.it/Druento/1.177.49.56
Marisa Agnesi/pasqualacerbi@libero.it/Su Cossu/165.95.182.48
Dott. Raffaellino Togliatti/trobbianimichela@alice.it/Sant'Alfio/15.70.213.155
Goffredo Zoppetti/arnaldotoldo@fastwebnet.it/Ca' De' Fabbri/254.134.137.195
Elmo Argan/atenulf54@tim.it/Ravarino/28.245.146.184
Dott. Panfilo Ortese/qsobrero@tin.it/Nola/196.193.82.208
Armando Redi/domenicopisacane@vodafone.it/Caprarico/111.201.129.236
Gino Bellocchio-Condoleo/porcellatoadriana@poste.it/Rosignano Monferrato/112.133.255.209
Augusto Camanni/maria74@tim.it/Senigallia/124.220.135.7
Napoleone Petrocelli/lcavanna@fastwebnet.it/Rosello/186.105.134.221
Tiziano Avogadro-Petralli/renata01@virgilio.it/Cannuzzo/199.88.147.49
Isabella Respighi/piergiorgiosperi@libero.it/Osimo Stazione/26.111.250.131
Gianluigi Mazzeo/etedesco@libero.it/Peregallo/23.151.3.169
Fiorenzo Fuseli-Cattaneo/baldassarefederico@virgilio.it/Asolo/7.97.11.200
Tullio Morpurgo-Morabito/cimarosaannalisa@fastwebnet.it/Piancaldoli/5.38.123.158
Amedeo Tropea/ubiagi@alice.it/Dragoni/223.101.126.214
Rosina Fuseli/acuda@tin.it/San Potito Sannitico/63.3.24.46
Ramona Desio/jboezio@virgilio.it/San Quirico Trecasali/219.207.3.80
Lucia Segni/priscilla75@tin.it/Tapogliano/227.115.109.26
Flora Proietti/tullio65@vodafone.it/Eremo Sant'Anna/253.77.218.86
Berenice Rubbia/girolamo28@poste.it/Laureana Di Borrello/27.191.158.216
Gastone Micheletti/sabatinovecellio@tin.it/Zerba/126.190.90.106
Benedetto Brancaccio/osvaldo33@tin.it/Pregiato/1.9.214.134
Lara Turrini/priuliguido@vodafone.it/Fabriano/48.10.60.68
Sig.ra Teresa Renier/costanzo25@alice.it/Oreno/205.9.100.132
Gemma Camiscione/atomasetti@alice.it/Vesime/33.186.212.9
Licia Berlusconi/inzaghitullio@virgilio.it/Palese/173.50.29.79
Pina Caffarelli/ugarzoni@vodafone.it/Parre Ponte Selva/97.240.226.63
Dott. Patrizia Basso/nolciniilaria@libero.it/Brufa/255.244.150.199
Anita Baroffio/catenazziantonina@libero.it/Zancona/149.142.89.242
Achille Gualandi-Collodi/arturo62@virgilio.it/Villa Passo/131.50.118.17
Donna Zetticci/gbarillaro@libero.it/Lajen/193.120.134.74
Nino Toso/nicoletta19@alice.it/Ciavolo/173.176.219.139
Berenice Piazzi/roccastella@tin.it/Casavatore/253.2.92.64
Imelda Faggiani/migliacciograziella@alice.it/Monte San Quirico/8.221.93.66
Carolina Farina/imelda18@virgilio.it/Lesignana/189.170.116.100
Diana Tamburi/melinarenier@fastwebnet.it/Macchia D'Isernia/129.140.11.28
Piero Canil/camicioneaugusto@tin.it/Saviore Dell'Adamello/220.23.13.61
Ninetta Ubaldi/zpisani@libero.it/Castiglione Di Ravenna/37.113.93.81
Ludovico Soprano/bellinienzio@fastwebnet.it/Sordio/29.146.93.77
Victoria Carpaccio/taliercioadriana@tim.it/Galgiana/81.130.155.76
Ferdinando Alonzi/cicalagiampiero@tin.it/Fabbrico/189.29.151.114
Priscilla Govoni-Giusti/traettaalina@poste.it/San Fermo/72.133.233.250
Giampiero Riccardi-Castioni/nmontalcini@virgilio.it/Lesa/107.152.55.87
Dott. Eva Zoppetto/vincenzopanicucci@poste.it/Corio/1.103.27.59
Dott. Ludovico Zola/edoardo00@libero.it/Bressa/109.201.79.63
Aldo Interminelli/ggiammusso@poste.it/Pile/48.47.113.247
Beatrice Sonnino/nadia16@tin.it/Barbania/63.103.137.231
Gastone Finotto/stoppaninicola@alice.it/Mergozzo/16.88.30.229
Elvira Faranda/boccionifabia@tin.it/Carlentini/255.17.216.3
Aurora Solimena/tagliafierrogeronimo@poste.it/Santa Chiara Di Nardo'/31.75.85.100
Vittoria Jovinelli/tcianciolo@libero.it/Revigliasco Torinese/190.178.155.197
Fulvio Troisi/teresalamborghini@poste.it/Breno/73.23.33.138
Alessio Bossi/donatellacasini@libero.it/Baragiano/38.188.134.12
Dott. Laureano Raimondi/ipersico@tin.it/Castelletto D'Orba/3.244.144.17
Sig. Rembrandt Montanari/amletorenier@poste.it/Bozen/106.245.162.42
Sig.ra Giulietta Barcaccia/lolita97@vodafone.it/Longobardi/198.135.81.184
Sig. Azeglio Olivetti/danilo04@libero.it/Cogollo/130.197.86.184
Sig. Orlando Cuda/cannizzarocorrado@poste.it/Gorzone/141.26.163.141
Panfilo Pontecorvo/bertoluccigianni@alice.it/Santa Maria Di Bobbio/78.7.34.249
Nicoletta Aulenti/adrianozanichelli@tim.it/Alice Superiore/122.238.27.250
Fabrizia Guidone/montesanocecilia@vodafone.it/Piani Di Borghetto/87.168.5.22
Azeglio Antonetti/romma@alice.it/Pontestazzemese/151.14.113.179
Massimo Pellico-Simeoni/zagurigianfranco@libero.it/Terrazze/99.23.218.248
Ranieri Piacentini/carolina70@poste.it/Sant'Andrea In Monte/15.1.102.208
Fabio Ginese/luxardoleone@vodafone.it/Fino Mornasco/50.72.129.123
Amedeo Sagese/croth@tin.it/Marengo/78.59.86.241
Nicola Tosto-Verga/guarino86@libero.it/Taisten/157.163.40.219
Ignazio Comisso/yburcardo@vodafone.it/Montebello Sul Sangro/210.48.204.100
Donatella Palazzo/ecrispi@vodafone.it/San Potito/113.26.109.81
Matilda Solari/neddagremese@tin.it/St. Peter in Ahrn/15.191.162.169
Serena Torlonia-Alighieri/germanaarmani@vodafone.it/Cerreto Sannita/47.219.225.21
Aldo Bignardi/romina55@fastwebnet.it/Ferrania/167.140.42.213
Bernardo Bertoni/pellicosole@libero.it/Villanova Del Sillaro/151.167.139.85
Sig.ra Nedda Montanari/pizzamanopriscilla@libero.it/Usago/180.249.186.165
Ennio Venturi/efantini@virgilio.it/Colli Di Suso/20.139.82.135
Jolanda Bosio/pompeo53@vodafone.it/Pflaurenz/68.110.32.70
Pellegrino Canevascini/massimiliano26@libero.it/Tesoriero/58.111.112.87
Dott. Hugo Crispi/tencallafrancesca@tim.it/Donori'/70.84.155.119
Sig.ra Adelasia Gussoni/francescodallape@fastwebnet.it/Avigliano Scalo/209.170.203.90
Sole Pepe/ninettadonati@vodafone.it/Siliqua/111.216.85.71
Dott. Livia Montanari/barbaranavarria@virgilio.it/Torre Del Pozzo/0.114.23.150
Ruggero Asprucci/hmigliaccio@tim.it/Borghetto Lodigiano/108.171.90.9
Paoletta Bruscantini/ybonino@tim.it/Onigo/24.195.176.225
Amico Spanevello/npelli@tim.it/Cavazzoli/183.120.195.179
Maria Campano/silvestro87@tin.it/Molochio/80.196.7.26
Antonella Emanuelli/ebeccheria@libero.it/Miazzina/77.89.13.88
Greca Amato/gianmarcofarinelli@alice.it/Villa Lagarina/244.242.248.55
Lilla Cociarelli-Roncalli/fpratesi@tin.it/Riola/58.177.200.209
Martino Sonnino/greco49@fastwebnet.it/Peschici/222.114.214.17
Leonardo Doria/ferdinandobuscetta@vodafone.it/Ponzano Monferrato/199.195.156.125
Alfredo Biagi/zoppettovincenzo@tin.it/Maissana/74.118.196.28
Orlando Carli-Gadda/lamborghinitatiana@vodafone.it/Stanghella/13.166.191.112
Orazio Basadonna/pomponio33@vodafone.it/Mezzocorona/237.194.191.168
Annunziata Porcellato/alberto72@poste.it/Bagnolo Di Lonigo/196.88.39.75
Ninetta Giradello/patriziaguinizzelli@tin.it/Vezzan/49.133.128.181
Giovanna Soffici/antonellodellucci@libero.it/Sant'Angelo In Villa/173.102.186.20
Matteo Boccaccio/nicolapetrucelli@virgilio.it/Aquileia/211.200.136.36
Telemaco Ruffini/mirco75@vodafone.it/Guidonia Montecelio/233.61.224.35
Cristina Benussi-Bottaro/tarmani@tim.it/Crema/175.40.216.161
Laureano Bottaro/wcremonesi@poste.it/Collemoresco/131.254.247.250
Alfredo Trapani/babatoadamo@libero.it/Portopalo Di Capo Passero/32.144.45.224
Sig.ra Tina Spallanzani/cutulidonatello@fastwebnet.it/Briano/195.167.231.206
Ennio Scandone/granatellimichele@tim.it/Cognola/13.196.2.248
Eva Ruberto/adelmogaluppi@virgilio.it/Premaor/255.231.238.80
Mariana Caruso/rpacomio@virgilio.it/Desenzano Al Serio/224.32.83.187
Alessandra Taliercio/mgolino@virgilio.it/Excenex/133.197.255.47
Filippa Balbo/paulina05@poste.it/Casalmaiocco/246.59.240.71
Diana Galtarossa/cirillo48@tin.it/Mondovi'/170.150.128.3
Bettina Giacometti/emma32@virgilio.it/Vitulazio/204.106.241.142
Gelsomina Garibaldi/sabatinoantonelli@alice.it/Riolo Terme/38.201.70.185
Vanessa Maspero/errigozaira@tim.it/Grignasco/129.76.238.62
Sig.ra Filippa Canali/valentina01@tin.it/Salcito/184.163.66.182
Annibale Golgi/xpacillo@virgilio.it/Vagli Sopra/242.171.129.43
Elena Poerio-Ruffini/vivaldimilo@vodafone.it/Calimera Calabra/31.226.236.197
Elladio Pacillo/satrianilivio@libero.it/Frasso Telesino/179.88.165.89
Pasquale Pelli/hrismondo@libero.it/Tanaunella/67.188.97.159
Dott. Elena Torricelli/nico11@virgilio.it/Lattughelle/93.136.156.69
Flavio Schiavone/martagiannotti@virgilio.it/Santa Maria Del Piave/0.92.206.226
Monica Gualtieri/peanosimonetta@libero.it/Santa Maria Lo Piano/119.231.217.92
Evangelista Bombieri/lorenzomilanesi@vodafone.it/Schlinig/23.125.115.200
Elvira Turati/otrezzini@tim.it/Mortizza/223.114.89.111
Adriana Bettin/xcainero@tim.it/Premosello Chiovenda/53.124.122.1
Fabrizia Caironi/vittorio20@tin.it/Monale/175.225.212.6
Ansaldo Vigliotti/benussimirco@fastwebnet.it/Ripa/101.208.55.99
Fiamma Gatto-Gagliardi/vanessasaragat@virgilio.it/Sirai/228.169.169.78
Dott. Ermes Paganini/casagrandealessio@tim.it/Lumignacco/68.187.4.133
Ludovica Villadicani-Balbo/loprestimichelangelo@fastwebnet.it/Montefiorino/30.52.160.236
Giorgio Prati/russomonica@fastwebnet.it/Azeglio/53.28.178.82
Michela Gagliardi/victoria14@libero.it/Capalbio Stazione/122.197.197.82
Silvia Curatoli/ggolgi@alice.it/Crocette/61.45.235.198
Raffaellino Mezzetta/vecolivittoria@fastwebnet.it/San Felice Sul Panaro/206.92.58.117
Antonello Pisani/wferrari@alice.it/Colombare Di Sirmione/228.95.63.117
Sophia Satriani-Monduzzi/crubbia@vodafone.it/Camini/133.229.37.220
Sante Cainero-Sonnino/federico98@vodafone.it/Fontana Fredda/11.92.145.87
Maurilio Carullo/aperini@tim.it/Windegg/19.105.15.203
Susanna Vespucci-Giradello/calgarielladio@alice.it/Villadoro/228.61.126.11
Marta Sgarbi/palomaferrara@fastwebnet.it/Pieve Di Compito/119.239.214.137
Dott. Marcantonio Brancaccio/amedeo28@alice.it/San Corrado Di Fuori/13.194.107.120
Loretta Gilardoni/galfieri@libero.it/San Zeno Di Montagna/253.86.48.118
Iolanda Vivaldi/umberto74@fastwebnet.it/Caselle In Pittari/182.126.155.235
Enzio Sordi/greca68@virgilio.it/Varna/211.78.113.51
Dott. Concetta Comisso/farinellimarcello@tim.it/Villa Bozza/49.165.138.218
Raffaella Cocci-Moccia/liguorinatalia@poste.it/San Martino Al Cimino/41.222.156.94
Vittoria Tencalla/bbonaventura@alice.it/Delia/254.241.85.225
Fulvio Ferrucci/achilleubaldi@alice.it/Roccalvecce/22.91.94.35
Dott. Massimo Correr/ebiagi@alice.it/Citta' Del Mare/90.122.224.117
Priscilla Gagliano/nino24@poste.it/Pratovecchio/138.59.221.227
Romana Bossi/navarriaottone@vodafone.it/Montemerlo/103.226.101.243
Tonino Vivaldi/ninetta96@tim.it/Caminata/116.216.115.118
Temistocle Lucarelli-Tanzini/giadastrangio@vodafone.it/Marina Di Savelletri/127.124.191.219
Mariana Palombi/vcontrafatto@tim.it/Casolla/105.58.158.218
Dott. Ferdinando Cannizzaro/delfino84@libero.it/Nesso/195.146.212.173
Fernanda Rienzo-Farina/federica75@tin.it/Milici/192.131.144.230
Dott. Renzo Guariento/alfio18@libero.it/Cana/110.65.181.148
Teresa Rossini/bverga@alice.it/Quiliano/160.204.171.58
Goffredo Bosio-Badoer/gian44@vodafone.it/Pignola/122.255.187.93
Dott. Claudia Romiti/gleopardi@libero.it/Sant'Arcangelo Di Magione/112.52.60.126
Ivo Milanesi/jacopoavogadro@virgilio.it/Vigolo Vattaro/177.124.206.20
Luca Rienzo-Verri/enniocaruso@poste.it/Ca' Gallo/189.193.173.64
Matteo Abatantuono/zmontecchi@vodafone.it/Fontanaluccia/38.131.45.9
Dott. Leonardo Martinelli/tgaltarossa@tim.it/Tossignano/218.142.182.163
Delfino Spallanzani/nannimaglio@fastwebnet.it/Vo'/115.228.225.100
Roman Nicoletti/anapolitano@virgilio.it/Villaggio Aldisio/141.140.190.149
Rossana Monti/nbonaventura@tin.it/Frassinelle Polesine/37.118.145.188
Arnaldo Panicucci/gcatalano@tim.it/Collina/243.56.162.34
Sig.ra Tatiana Onio/ubalotelli@virgilio.it/Brusatasso/218.213.136.92
Laura Camuccini/jolanda11@poste.it/Sant'Ilario D'Enza/55.48.93.70
, что создавало много лишних копий в папке TEMP и на финальном процессе слияния в теории при размере на файле в 1 терабайт могло вызвать исключение на функции финального слияния в экспортируемый файл. Данная обнова по замерам на маленьком файле в 2 гига никак не повлияла на скорость, а вот на 200 гиговом логе в одинаковых условиях скорость выросла в полтора раза.
read_and_process_chunks_multiprocess" создаст 7 файлов TMP, тогда как будет действовать "batch_merge" в это время?Не понял вашего вопроса. По логике если останется 10 файлов или меньше в папке TEMP, должна отработать функция final_merge, и произойти финальное слияние в output.txt, если же файл остается всего 1, он переименовывается в выходной.Как программа объединит 7 блоков (что означает, что она создаст нечетное количество файлов TMP)? Например, функция "read_and_process_chunks_multiprocess" создаст 7 файлов TMP, тогда как будет действовать "batch_merge" в это время?
while len(temp_files) > batch_size: # Пока временных файлов больше, чем размер пакета
temp_files, unique_count, duplicate_count = batch_merge(temp_files, batch_size, temp_dir, num_merge_processes) # Выполняем пакетное слияние
total_unique_count += unique_count # Обновляем общий счетчик уникальных строк
total_duplicate_count += duplicate_count # Обновляем общий счетчик дубликатов
def process_chunk_and_write_multiprocess(chunk, temp_dir):
...
unique_items = set(chunk)
sorted_chunk = sorted(unique_items)
...
if final:
for line, group in itertools.groupby(merged_iter):
outfile.write(line)
Это все конечно хорошо, но покажи исходник. Set на этапе обработки чанков делает удаление, посмотри про уникальность множеств в set(). Ограничение по файловым дескрипторам можно обходить в системе. При увеличении размера чанка скорость может как падать так и повышаться, я уже писал об этом. Все зависит от железки. Выбрал наиболее оптимальный вариант в 2 миллиона строк на чанк. ИМХО единственное что сможет поднять скорость скрипта, это обработка на GPU. Если есть время, попробуй код перевести на GPUСпасибо за предоставленный мне код
Очень надежный и информативный скрипт
Многопроцессорная обработка на моем железе добавляет около 30-40% к скорости в зависимости от настроек обработки по сравнению с однопоточным аналогом.
Я также захотел разобраться в коде и посмотреть, что можно оптимизировать.
Далее мнение дилетанта и некоторые вопросы по коду. Поскольку я не вижу хайд, я буду обсуждать код который у меня есть:
Начал я с функции обработки кусков:
Я не нашел никакого смысла в использовании set(chunk), поскольку за удаление дублей отвечает другой код. Если все таки избавиться от set(), то наиболее эффективно будет использовать chunk.sort() без переназначения и копирования, то есть без sorted_chunk = sorted(chunk).Python:def process_chunk_and_write_multiprocess(chunk, temp_dir): ... unique_items = set(chunk) sorted_chunk = sorted(unique_items) ...
Это снизит нагрузку на процессор и уменьшит расход оперативной памяти. Общая скорость скрипта не вырастет, т.к. на хорошем железе многопроцессорность легко компенсирует эту потерю эффективности.
Если же set() зачем то нужен, то следует переназначать chunk, вместо использования разных переменных, это позволит высвобождать оперативную память быстрее, поскольку перед выходом из функции идет задержка с записью в файл.
По удалению дублей также есть вопрос.
Функция слияния использует удаление дублей(merge_files_parallel) как для пакетного, так и для финального слияния. То есть, каждый раз когда мы сливаем пакетные файлы мы также удаляем дубли. Но достаточно сделать это один раз при финальном слиянии. Эта задержка также почти полностью компенсируется многопроцессорностью, но оптимизировать стоит.
Я добавил флаг final в параметры для функции удаления дублей и ограничил им функцию удаления дублей. Я также экспериментировал с алгоритмами для удаления дублей и остановился на лаконичном:
Python:if final: for line, group in itertools.groupby(merged_iter): outfile.write(line)
На мой субъективный взгляд это работает капельку быстрее и выглядит лаконичнее. Но стоит заметить, groupby не использует strip() при сравнениях. Но поскольку мы уже применяли strip() при нарезке кусков, то сравнения будут считаться корректными.
Я пробовал добавить многопроцессорную обработку для финального слияния, используя threading.Lock(). И в процессхакере это выглядело очень даже хорошо с точки зрения использования I\O(не было снижений скорости или пауз как в однопотоке, скорость чтения отличная), но по итогу на файле в 30 гб без пакетных слияний это совсем не отразилось
Numpy также не смог поднять скорость скрипта, даже не смотря на фиксированные размеры массива и длину строк.
В итоге стало понятным, что скрипт можно сделать более эффективным с точки зрения использования ресурсов, но не намного быстрее.
Единственное, что значительно улучшило общую скорость - это не использование пакетного слияния без особой необходимости.
Поскольку ограничением для heapq.merge является только OS(512 и 2048 файловых дескрипторов для Windows x32 и x64), то повышение этой планки до этих лимитов повышает скорость скрипта на 25-40%.
А в случае обработки действительно больших словарей выгоднее увеличить размер куска в 10+ раз, чем допустить хоть одно слияние. Этого вполне хватит, чтобы обработать даже очень большие словари без слияния. Если позволит оперативная память, конечно.
Я это понимаю, но зачем нужно удаление на уровне чанков? Ведь если хоть один дупликат не будет находиться в пределах одного куска, то set() полностью теряет смыслЭто все конечно хорошо, но покажи исходник. Set на этапе обработки чанков делает удаление, посмотри про уникальность множеств в set().
def process_chunk_and_write_multiprocess(chunk, temp_dir):
try:
logger.info(f"Начало обработки чанка размером {len(chunk)} строк (процесс)")
heavy_computation(10000000) # Симуляция тяжелых вычислений
chunk.sort()
unique_filename = os.path.join(temp_dir, f"tempfile_{uuid.uuid4()}.tmp") # Генерируем уникальное имя файла
with open(unique_filename, 'w', encoding='utf-8', errors='replace') as file: # Сохранение в файл
for line in chunk:
file.write(line)
return unique_filename
except Exception as e:
logger.error(f"Ошибка при обработке чанка: {e}")
return None
def merge_files_parallel(temp_files, output_file, chunk_size=2000000, num_threads=24, final=False):
try:
logger.info(f"Параллельное слияние {len(temp_files)} временных файлов с использованием {num_threads} потоков")
unique_count = 0
duplicate_count = 0
with ExitStack() as stack, open(output_file, 'a', encoding='utf-8', errors='replace') as outfile:
file_iters = [stack.enter_context(open(temp_file, "r", encoding='utf-8', errors='replace')) for temp_file in temp_files]
merged_iter = heapq.merge(*file_iters)
if final:
for line, group in groupby(merged_iter):
outfile.write(line)
unique_count+=1
#duplicate_count+=len(list(group))-1 #при желании раскомментировать
else:
for line in merged_iter:
outfile.write(line)
logger.info(f"Слияние завершено. Уникальных строк: {unique_count}, дублей удалено: {duplicate_count}")
return unique_count, duplicate_count, output_file
except Exception as e:
logger.error(f"Ошибка при параллельном слиянии файлов: {e}")
return 0, 0
Потому-что на границах слияния файлов могут быть погрешности, и чистку нужно делать на 3х этапах работы алгоритма (независимо будут ли дубли на этапе создания чанков или нет). Формирование чанков, пакетное слияние, финальное слияние. В другом случае по-моему возникали погрешности при тестах на крупных файлах на размерах от 100 гигабайт. (Да я вспомнил, с antikrya когда тестировали, он жаловался что дубли чистятся не все в его 200 гиговом файле, мне пришлось придумать такое решение).Я это понимаю, но зачем нужно удаление на уровне чанков? Ведь если хоть один дупликат не будет находиться в пределах одного куска, то set() полностью теряет смысл
На сколько я могу судить, погрешность может возникать, только если нарушена сортировка, либо из-за ошибки кодировки, невидимых спецсимволов и тд, потому что финальный алгоритм прост и надежен. Если ошибается он, значит что-то не в порядке. Но heapq.merge тоже ориентируется на отсортированные куски. Он не должен ошибаться, потому что нет какой либо гонки и тд.Потому-что на границах слияния файлов могут быть погрешности, и чистку нужно делать на 3х этапах работы алгоритма (независимо будут ли дубли на этапе создания чанков или нет). Формирование чанков, пакетное слияние, финальное слияние. В другом случае по-моему возникали погрешности при тестах на крупных файлах на размерах от 100 гигабайт. (Да я вспомнил, с antikrya когда тестировали, он жаловался что дубли чистятся не все в его 200 гиговом файле, мне пришлось придумать такое решение)
def process_chunk_and_write_multiprocess(chunk, temp_dir):
'''
with open(unique_filename, 'w', encoding='utf-8') as temp_file:
temp_file.write("\n".join(sorted_chunk) + "\n")
'''
Ну поэксперементируйте (в чужих логах мне копаться права не давали во время тестов (но было много ошибок при их обработке), хотя я это и предполагал, поэтому сделал более универсальный вариант который жрет логи и с мусором и образцово-сгенерированные рандомным генератором). Еще просьба попробовать как подметил Guron_18 мне в ПМ обрабатывать строки вместо:На сколько я могу судить, погрешность может возникать, только если нарушена сортировка, либо из-за ошибки кодировки, невидимых спецсимволов и тд, потому что финальный алгоритм прост и надежен. Если ошибается он, значит что-то не в порядке. Но heapq.merge тоже ориентируется на отсортированные куски. Он не должен ошибаться, потому что нет какой либо гонки и тд.
Я сейчас смотрю на оригинальный код и вижу:
Думаю, сортировка нарушается из-за лишней "\n". Это как раз граница куска, как вы и говорилиPython:def process_chunk_and_write_multiprocess(chunk, temp_dir): ''' with open(unique_filename, 'w', encoding='utf-8') as temp_file: temp_file.write("\n".join(sorted_chunk) + "\n") '''
open(f, 'r', encoding='utf-8', errors='replace')
open(file=file, mode='r+b')
Не должно, это было сделано для того чтобы в финале после создания файла в конце всегда был переход каретки. =\Думаю, сортировка нарушается из-за лишней "\n". Это как раз граница куска, как вы и говорили
Я протестировал на файле в 3 гб. С лишним пробелом или без, каждый прогон давал разные результаты на одном и том же файле. Разница невелика, в 4-37 файлов, но она ведь есть и это странноНу поэксперементируйте (в чужих логах мне копаться права не давали во время тестов (но было много ошибок при их обработке), хотя я это и предполагал, поэтому сделал более универсальный вариант который жрет логи и с мусором и образцово-сгенерированные рандомным генератором). Еще просьба попробовать как подметил Guron_18 мне в ПМ обрабатывать строки вместо:
Python:open(f, 'r', encoding='utf-8', errors='replace')
Вот так побайтово:
Python:open(file=file, mode='r+b')
open(file=file, mode='r+b'), потому что чтение пойдет быстрее и нужно будет лишь придумать алгоритм для расшифровки строк, либо только для итерации, чтобы обрабатывать хекс, если это будет так работать. Также стоит рассмотреть cchardet или charset-normalizer, чтобы динамически определять кодировку файла. Может, ansi строки будут читаться и записываться быстрее, обычно этого достаточно.Вам нужно сделать образцовый файл, который обработан в памяти тем же сетом например и уже по нему сверяться применяя свои изменения, если расхождения например есть хотя-бы в 1 байт на выходе от образцового, значит алгоритм работает неправильно.=) Я же это протестировал на размерах 1, 100, 300 гигабайт (большие файлы генерировал рандомными генераторами и мерджил в них дубли по рандомным участкам). Все было ок. Также при повторных сканированиях исходящих файлов всегда должно быть 0 дублей по счетчику, если они есть, значит что-то отработало неверно при первой обработке. По мне лучше сосредоточится на GPU вычислениях, ну если найдете способ который работает быстрее и без погрешностей, будет круто, я же остановился на том варианте который под хайдом, и натыкался на те-же проблемы с которыми столкнулись Вы. =)Я протестировал на файле в 3 гб. С лишним пробелом или без, каждый прогон давал разные результаты на одном и том же файле. Разница невелика, в 4-37 файлов, но она ведь есть и это странно
Без пакетной сортировки он находит на 100-300 дублей больше, но все равно каждый раз по разному.
Мда, мой скрипт оказывается страдает тем же, а его путь более отслеживаемый и я не понимаю, где может быть проблема
Также заметил, что set() иногда убирает дубли, которые основной алгоритм пропускает. Но при этом сам set() отсеивает еще меньше в одиночку.
Возможно проблема в кодировке файла, там местами присутствуют кракозябры.
Это может быть интересным,open(file=file, mode='r+b'),потому что чтение пойдет быстрее и нужно будет лишь придумать алгоритм для расшифровки строк, либо только для итерации, чтобы обрабатывать хекс, если это будет так работать. Также стоит рассмотреть cchardet или charset-normalizer, чтобы динамически определять кодировку файла. Может, ansi строки будут читаться и записываться быстрее, обычно этого достаточно.
import os
import time
import uuid
import heapq
import shutil
import logging
from contextlib import ExitStack
from collections import namedtuple
from colorlog import ColoredFormatter
from charset_normalizer import from_bytes
from multiprocessing import Lock, Value, Process, JoinableQueue
from itertools import islice, groupby, cycle
from concurrent.futures import ProcessPoolExecutor, as_completed
def get_unique_filename(): # Функция для генерации уникального имени файла
return os.path.join(temp_dir, f"temp_file_{uuid.uuid4().hex}.tmp")
def get_files_path(file_path): # Функция для получения списка всех файлов во временной папке
return [os.path.join(file_path, f) for f in os.listdir(file_path) if os.path.isfile(os.path.join(file_path, f))]
def generate_file_paths(file_path, count): # функция для генерации имен выходного файла
base_name, ext = os.path.splitext(file_path)
directory = os.getcwd() if not os.path.isabs(file_path) else os.path.dirname(file_path)
if more_output_files > 1:
return [os.path.join(f"{directory}\\{output_file_dir}", f"{base_name}_{i + 1}{ext}") for i in range(count)]
else:
return [os.path.join(f"{directory}\\{output_file_dir}", f"{base_name}{ext}")]
def worker_final(chunk_queue, files, unique_count, locked): # Функция многопоточного финального слияния
try:
outfile_cycle = cycle(files)
while True:
chunk = chunk_queue.get()
if chunk is None:
break
chunk = ''.join(line for line, _ in groupby(chunk))
unique_count.value += chunk.count('\n')
with locked:
current_file = next(outfile_cycle)
with open(current_file, mode.write_mode, encoding=mode.local_encoding, errors=mode.error) as outfile:
if bin_mode:
outfile.write(chunk.encode(mode.global_encoding))
else:
outfile.write(chunk)
except Exception as e:
logger.error(f"Ошибка при параллельной обработке финальных кусков {e}")
def parallel_final_merge(merged_iter): # функция для запуска многопоточного финального слияния
processes = []
chunk_queue = JoinableQueue()
for _ in range(num_processes):
p = Process(target=worker_final, args=(chunk_queue, generate_file_paths(output_file, more_output_files) ,total_unique_count, lock,))
p.start()
processes.append(p)
while True:
chunk = list(islice(merged_iter, chunk_size))
if not chunk:
for _ in range(num_processes):
chunk_queue.put(None)
break
if chunk_queue.qsize():
chunk_queue.join()
chunk_queue.put(chunk)
for p in processes:
p.join()
def standard_final_merge(merged_iter): # функция для однопоточного финального слияния
with ExitStack() as stack:
files_cycle = cycle(
[stack.enter_context(open(final_file, mode.write_mode, encoding=mode.local_encoding, errors=mode.error))
for final_file in generate_file_paths(output_file, more_output_files)])
while True:
chunk = ''.join(islice((line for line, _ in groupby(merged_iter)), chunk_size))
if not chunk:
break
total_unique_count.value += chunk.count('\n')
outfile = next(files_cycle)
if bin_mode:
outfile.write(chunk.encode(mode.global_encoding))
else:
outfile.write(chunk)
def batch_merge(merged_iter, outfile_name): # функция для многопоточного пакетного слияния
with open(outfile_name, 'w', encoding=mode.global_encoding, errors='replace') as outfile:
outfile.writelines(line for line in merged_iter)
def main_merge(temp_files, final=False): # функция для запуска слияния
try:
logger.info(f"Запущено слияние {len(temp_files)} временных файлов")
with ExitStack() as stack:
file_iters = [stack.enter_context(open(temp_file, "r", encoding=mode.global_encoding, errors='replace')) for temp_file
in temp_files]
merged_iter = heapq.merge(*file_iters)
if final:
if final_thread_mode:
parallel_final_merge(merged_iter)
else:
standard_final_merge(merged_iter)
else:
batch_merge(merged_iter, get_unique_filename())
except Exception as e:
logger.error(f"Ошибка слияния файлов: {e}")
def batch_threads_merge(): # функция для запуска многопоточного пакетного слияния
try:
with ProcessPoolExecutor(max_workers=num_processes) as executor:
futures = []
temp_files = get_files_path(temp_dir)
for i in range(0, len(temp_files), batch_size):
batch = temp_files[i:i + batch_size]
futures.append(executor.submit(main_merge, batch, False))
for _ in as_completed(futures):
logger.info(f"Пакет собран!")
for temp_file in temp_files:
logger.info(f"Удаление временного файла: {temp_file}")
os.remove(temp_file)
except Exception as e:
logger.error(f"Ошибка пакетного слияния файлов: {e}")
return temp_files
def worker_cut_bin(chunk_queue, orig_count): # функция многопоточной нарезки кусков в бинарном режиме
try:
while True:
chunk = chunk_queue.get()
chunk_queue.task_done()
if chunk is None:
break
chunk = chunk.decode(mode.global_encoding).split('\n')
logger.info(f"Начало обработки куска размером {len(chunk)} строк (процесс)")
orig_count.value += len(chunk)
chunk.sort()
chunk = ''.join(f"{line.strip()}\n" for line in chunk).encode(mode.global_encoding)
with open(get_unique_filename(), 'wb') as file:
file.write(chunk)
except Exception as e:
logger.error(f"Ошибка при обработке куска: {e}")
def worker_cut(chunk_queue, orig_count): # функция многопоточной нарезки кусков в построчном режиме
try:
while True:
chunk = chunk_queue.get()
chunk_queue.task_done()
if chunk is None:
break
logger.info(f"Начало обработки куска размером {len(chunk)} строк (процесс)")
orig_count.value += len(chunk)
chunk.sort()
chunk = ''.join(f"{line}\n" for line in chunk)
with open(get_unique_filename(), 'w', encoding=mode.global_encoding, errors='replace') as file:
file.write(chunk)
except Exception as e:
logger.error(f"Ошибка при обработке куска: {e}")
def cut_chunks(file_path): #функция для запуска многопоточной нарезки кусков
with open(file_path, mode.read_mode, encoding=mode.local_encoding, errors=mode.error) as infile:
processes = []
chunk_queue = JoinableQueue()
for _ in range(num_processes):
process = Process(target=mode.worker, args=(chunk_queue, total_orig_count))
process.start()
processes.append(process)
while True:
chunk = mode.cut_read_func(infile)
if not chunk:
for _ in range(num_processes):
chunk_queue.put(None)
break
if chunk_queue.qsize():
chunk_queue.join()
chunk_queue.put(chunk)
for process in processes:
process.join()
def read_text_chunk(infile):
return [line.strip() for line in islice(infile, chunk_size)]
def read_binary_chunk(infile): # читаем файл в бинарном режиме
chunk = infile.read(chunk_size * byte_coefficient)
if not chunk:
return None
if chunk[-1] != b"\n": # если есть недочитанная строка
chunk += last_line(infile) # дочитываем ее
return chunk
def last_line(infile): # функция для дочитывания последней строки
byte_buffer = b""
while True:
next_byte = infile.read(1)
if not next_byte or next_byte == b"\n":
return byte_buffer + next_byte
byte_buffer += next_byte
def create_service_dir(path):
os.makedirs(os.path.join(os.getcwd(), path), exist_ok=True)
def clean_tempdir():
try:
shutil.rmtree(temp_dir)
except Exception as e:
logger.error(f"Ошибка при удалении временных файлов: {e}")
def get_mode(): # сервисная функция для подстройки под режим
read_mode = 'rb' if bin_mode else 'r'
write_mode = 'ab' if final_thread_mode and bin_mode else 'wb' if bin_mode else 'a' if final_thread_mode else 'w'
global_encoding = get_encoding()
local_encoding = None if bin_mode else global_encoding
error = None if bin_mode else 'replace'
cut_read = read_binary_chunk if bin_mode else read_text_chunk
worker = worker_cut_bin if bin_mode else worker_cut
return FileMode(read_mode=read_mode, write_mode=write_mode, global_encoding=global_encoding, local_encoding=local_encoding, error=error, cut_read_func=cut_read, worker=worker)
def get_encoding_from_file(file): # Функция определения кодировки
with open(file, 'rb') as f:
file_bytes = f.read(1000)
result = from_bytes(file_bytes).best()
if result:
return result.encoding
else:
return 'utf-8'
def get_encoding():
temp_set = set()
for file_path in more_input_files:
temp_set.add(get_encoding_from_file(file_path))
if len(temp_set) == 1:
return temp_set.pop()
else:
return 'utf-8'
def get_logger():
formatter = ColoredFormatter(
"%(log_color)s%(asctime)s - %(levelname)s - %(message)s",
log_colors={'DEBUG': 'cyan', 'INFO': 'green', 'WARNING': 'yellow', 'ERROR': 'red', 'CRITICAL': 'bold_red'}
)
handler = logging.StreamHandler()
handler.setFormatter(formatter)
log = logging.getLogger()
log.addHandler(handler)
log.setLevel(logging.INFO)
return log
def main():
try:
tic = time.perf_counter()
logger.info(f"Создаем папки...")
create_service_dir(temp_dir)
create_service_dir(output_file_dir)
logger.info(f"Начата нарезка кусков...")
if input_file:
more_input_files.append(input_file) # если указан input_file, добавляем его в список,
for file_path in more_input_files: # перебираем входные файлы
cut_chunks(file_path) #и нарезаем их на куски
while len(get_files_path(temp_dir)) > max_batch: # если кусков больше, чем max_batch(2000 для windows x64), то сливаем пакеты по batch_size
logger.info(f"Начато пакетное слияние временных файлов...")
batch_threads_merge() # то запускаем многопоточное пакетное слияние
logger.info(f"Начато финальное слияние...")
main_merge(get_files_path(temp_dir), True)
tac = time.perf_counter()
logger.info(
f"\n\n\n\t\t###########################\n\t\t##Готово, вы великолепны!##\n\t\t###########################\n\n"
f"\t\tВсего строк:\t{total_orig_count.value}\n\t\tУникальных:\t{total_unique_count.value}"
f"\n\t\tДубликатов:\t{total_orig_count.value-total_unique_count.value}\n\t\t"
f"Общее время:\t{tac - tic:.2f} сек.\n\n\n") # подводим итоги
except Exception as e:
logger.error(f"Ошибка в главной функции: : {e}")
finally:
logger.info(f"Удаление временных файлов в {temp_dir}...")
clean_tempdir() # подтираем за собой
input_file = '' # указываем входной файл, либо используем more_input_files
bin_mode = True # включаем бинарный режим
final_thread_mode = False # многопроцессорное финальное слияние
output_file = "test_out.txt" # конечный файл
output_file_dir = "OUTPUT" # будет в этой папке
input_file_dir = "INPUT" # папка для входных файлов
more_input_files = get_files_path(input_file_dir) # берем все файлы из input_file_dir, можно заменить на [ваши файлы], либо использовать input_file
more_output_files = 1 # разбить финальное слияние на N файлов
chunk_size = 2000000 # максимум строк в куске
num_processes = 10 # количество процессов
max_batch = 2000 # максимум кусов без пакетного слияния
batch_size = max_batch // num_processes # максимум кусков в пакете
byte_coefficient = 15 # коэффициент умножения chunk_size на byte_coefficient, считаем что 1 строка состоит из 15 байт. В зависимости от специфики входного файла можно увеличить или уменьшить
total_orig_count = Value('i', 0) # считаем общее количество строк
total_unique_count = Value('i', 0) # считаем уникальные строки
temp_dir = "TEMP" # название папки для временных файлов
lock = Lock() # устанавливаем Lock
logger = get_logger() # устанавливаем логирование
FileMode = namedtuple('FileMode', ['read_mode', 'write_mode', 'global_encoding', 'local_encoding', 'error', 'cut_read_func', 'worker']) # переменные для выбора режимов
mode = get_mode() # подгоняем переменные под режим
if __name__ == "__main__":
main()