I have 2 text files (*.txt) that contain unique strings in the format:
udtvbacfbbxfdffzpwsqzxyznecbqxgebuudzgzn:refmfxaawuuilznjrxuogrjqhlmhslkmprdxbascpoxda
ltswbjfsnejkaxyzwyjyfggjynndwkivegqdarjg:qyktyzugbgclpovyvmgtkihxqisuawesmcvsjzukcbrzi
The first file contains 50 million such lines (4.3 GB), and the second contains 1 million lines (112 MB). One line contains 40 characters, delimiter : and 45 more characters.
Task: get unique values for both files. That is, you need a csv or txt file with lines that are in the second file and which are not in the first.
I am trying to do this using vaex (Vaex):
import vaex
base_files = ['file1.txt']
for i, txt_file in enumerate(base_files, 1):
    for j, dv in enumerate(vaex.from_csv(txt_file, chunk_size=5_000_000, names=['data']), 1):
        dv.export_hdf5(f'hdf5_base/base_{i:02}_{j:02}.hdf5')
check_files = ['file2.txt']
for i, txt_file in enumerate(check_files, 1):
    for j, dv in enumerate(vaex.from_csv(txt_file, chunk_size=5_000_000, names=['data']), 1):
        dv.export_hdf5(f'hdf5_check/check_{i:02}_{j:02}.hdf5')
dv_base = vaex.open('hdf5_base/*.hdf5')
dv_check = vaex.open('hdf5_check/*.hdf5')
dv_result = dv_check.join(dv_base, on='data', how='inner', inplace=True)
dv_result.export(path='result.csv')
As a result, I get the result.csv file with unique row values. But the verification process takes a very long time. In addition, it uses all available RAM and all processor resources. How can this process be accelerated? What am I doing wrong? What can be done better? Is it worth using other libraries (pandas, dask) for this check and will they be faster?
UPD 10.11.2020 So far, I have not found anything faster than the following option:
from io import StringIO
def read_lines(filename):
    handle = StringIO(filename)
    for line in handle:
        yield line.rstrip('\n')
def read_in_chunks(file_obj, chunk_size=10485760):
    while True:
        data = file_obj.read(chunk_size)
        if not data:
            break
        yield data
file_check = open('check.txt', 'r', errors='ignore').read()
check_set = {elem for elem in read_lines(file_check)}
with open(file='base.txt', mode='r', errors='ignore') as file_base:
    for idx, chunk in enumerate(read_in_chunks(file_base), 1):
        print(f'Checked [{idx}0 Mb]')
        for elem in read_lines(chunk):
            if elem in check_set:
                check_set.remove(elem)
print(f'Unique rows: [{len(check_set)}]')
UPD 11.11.2020: Thanks @m9_psy for the tips to improve performance. It's really faster! Currently, the fastest way is:
from io import BytesIO
check_set = {elem for elem in BytesIO(open('check.txt', 'rb').read())}
with open('base.txt', 'rb') as file_base:
    for line in file_base:
        if line in check_set:
            check_set.remove(line)
print(f'Unique rows: [{len(check_set)}]')
Is there a way to further speed up this process?
Open the free code compare tool. Paste the first code or text file in the first or left column. And paste the second code or text file (with whom you want to compare) in the second or right column. Between the two columns, there is an arrow sign.
I have a suspicion that the join operation requires n * m comparison operations where n and m are the length of the two dataframes.
Also, there's an inconsistency between your description and your code:
dv_check but not in dv_base
dv_check.join(dv_base, on='data', how='inner', inplace=True) ⟶ this means in both dv_check and dv_base
Anyhow, an idea is to use set since checking for membership in a set has a time complexity of O(1) while checking membership in a list has a complexity of O(n). If you are familiar with the SQL world, this is equivalent to moving from a LOOP JOIN strategy to a HASH JOIN strategy:
# This will take care of removing the duplicates
base_set = set(dv_base['data'])
check_set = set(dv_check['data'])
# In `dv_check` but not `dv_base`
keys = check_set - base_set
# In both `dv_check` and `dv_base`
keys = check_set & base_set
This only gives you the keys that satisfy your condition. You still have to filter the two dataframes to get the other attributes.
Finished in 1 minute and 14 seconds on my 2014 iMac with 16GB of RAM.
Let's generate a dataset to mimic your example
import vaex
import numpy as np
N = 50_000_000  # 50 million rows for base
N2 = 1_000_000  # 1 million for check
M = 40+1+45     # chars for each string
N_dup = 10_000  # number of duplicate rows in the checks
s1 = np.random.randint(ord('a'), ord('z'), (N, M), np.uint32).view(f'U{M}').reshape(N)
s2 = np.random.randint(ord('a'), ord('z'), (N2, M), np.uint32).view(f'U{M}').reshape(N2)
# make sure s2 has rows that match s1
dups = np.random.choice(N2, N_dup, replace=False)
s2[dups] = s1[np.random.choice(N, N_dup, replace=False)]
# save the data to disk
vaex.from_arrays(s=s1).export('/data/tmp/base.hdf5')
vaex.from_arrays(s=s2).export('/data/tmp/check.hdf5')
Now, to find the rows in check, that are not in base, we can join them, and drop the rows that did not match:
import vaex
base = vaex.open('/data/tmp/base.hdf5')
check = vaex.open('/data/tmp/check.hdf5')
# joined contains rows where s_other is missing
joined = check.join(base, on='s', how='left', rsuffix='_other')
# drop those
unique = joined.dropmissing(['s_other'])
# and we have everything left
unique
#      s                                                    s_other
0      'hvxursyijiehidlmtqwpfawtuwlmflvwwdokmuvxqyujfh...  'hvxursyijiehidlmtqwpfawtuwlmflvwwdokmuvxqyujfhb...
1      'nslxohrqydxyugngxhvtjwptjtsyuwaljdnprwfjnssikh...  'nslxohrqydxyugngxhvtjwptjtsyuwaljdnprwfjnssikhh...
2      'poevcdxjirulnktmvifdbdaonjwiellqrgnxhbolnjhact...  'poevcdxjirulnktmvifdbdaonjwiellqrgnxhbolnjhactn...
3      'xghcphcvwswlsywgcrrwxglnhwtlpbhlnqhjgsmpivghjk...  'xghcphcvwswlsywgcrrwxglnhwtlpbhlnqhjgsmpivghjku...
4      'gwmkxxqkrfjobkpciqpdahdeuqfenrorqrwajuqdgluwvb...  'gwmkxxqkrfjobkpciqpdahdeuqfenrorqrwajuqdgluwvbs...
...    ...                                                  ...
9,995  'uukjkyaxbjqvmwscnhewxpdgwrhosipoelbhsdnbpjxiwn...  'uukjkyaxbjqvmwscnhewxpdgwrhosipoelbhsdnbpjxiwno...
9,996  'figbmhruheicxkmuqbbnuavgabdlvxxjfudavspdncogms...  'figbmhruheicxkmuqbbnuavgabdlvxxjfudavspdncogmsb...
9,997  'wwgykvwckqqttxslahcojcplnxrjsijupswcyekxooknji...  'wwgykvwckqqttxslahcojcplnxrjsijupswcyekxooknjii...
9,998  'yfopgcfpedonpgbeatweqgweibdesqkgrxwwsikilvvvmv...  'yfopgcfpedonpgbeatweqgweibdesqkgrxwwsikilvvvmvo...
9,999  'qkavooownqwtpbeqketbvpcvxlliptitespfqkcecidfeb...  'qkavooownqwtpbeqketbvpcvxlliptitespfqkcecidfebi...
Here is another approach. The check file is about 0.1 GB (fits in memory). The base file is up to 100 GB (so process a line at a time).
Create test data and generator function to import data
from io import StringIO
# test data for base (>50 million lines)
base_file = '''a
b
c
d
e
'''
# test data for check (1 million lines)
check_file = '''d
e
f
g
'''
def read_lines(filename):
    ''' Read data file one line at a time (with generator function).'''
    handle = StringIO(filename)
    for line in handle:
        yield line.rstrip('\n')
Find elements in the check file only (check_set - base_set in @CodeDifferent's example)
check_set = {elem for elem in read_lines(check_file)}
for elem in read_lines(base_file):
    if elem in check_set:
        check_set.remove(elem)
print(check_set)
{'g', 'f'}
Find intersection (check_set & base_set in @CodeDifferent's example)
check_set = {elem for elem in read_lines(check_file)}
common_elements = set()
for elem in read_lines(base_file):
    if elem in check_set:
        common_elements.add(elem)
print(common_elements)
{'d', 'e'}
I think this approach would work best when (a) base file is much larger than check file and (b) base file is too big for in-memory data structure.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With