Logo Questions Linux Laravel Mysql Ubuntu Git Menu
 

How do I download a large list of URLs in parallel in pyspark?

I have an RDD containing 10000 urls to be fetched.

list = ['http://SDFKHSKHGKLHSKLJHGSDFKSJH.com',
        'http://google.com',
        'http://twitter.com']
urls = sc.parallelize(list)

I need to check which urls are broken and preferably fetch the results to a corresponding RDD in Python. I tried this:

import asyncio
import concurrent.futures
import requests

async def get(url):

    with concurrent.futures.ThreadPoolExecutor() as executor:

        loop = asyncio.get_event_loop()
        futures = [
            loop.run_in_executor(
                executor, 
                requests.get, 
                i
            )
            for i in url
        ]
        return futures

async def get_response(futures):
    response = await asyncio.gather(futures,return_exceptions=True)
    return(response)

tasks = urls.map(lambda query: get(query)) # Method returns http call response as a Future[String]

results = tasks.map(lambda task: get_response(task) )
results = results.map(lambda response:'ERR' if isinstance(response, Exception) else 'OK' )
results.collect()

I get the following output which obviously is not right:

['OK', 'OK', 'OK']

I also tried this:

import asyncio
import concurrent.futures
import requests

async def get():

    with concurrent.futures.ThreadPoolExecutor(max_workers=20) as executor:

        loop = asyncio.get_event_loop()
        futures = [
            loop.run_in_executor(
                executor, 
                requests.get, 
                i
            )
            for i in urls.toLocalIterator()
        ]
        for response in await asyncio.gather(*futures,return_exceptions=True):
            print('{}: {}'.format(response, 'ERR' if isinstance(response, Exception) else 'OK'))
            pass


loop = asyncio.get_event_loop()
loop.run_until_complete(get())

I get the following output:

HTTPConnectionPool(host='SDFKHSKHGKLHSKLJHGSDFKSJH.com', port=80): Max retries exceeded with url: / (Caused by 
NewConnectionError('<urllib3.connection.HTTPConnection object at 0x12c834210>: Failed to establish a new connection: [Errno 8] nodename nor servname provided, or not known')): ERR
<Response [200]>: OK
<Response [200]>: OK

Desired output would be something like this:

http://SDFKHSKHGKLHSKLJHGSDFKSJH.com : ERR
http://google.com : OK
http://twitter.com : OK

But the problem with the second approach is the use of lists to store future objects. I believe that using RDD is better, since number of urls can be in millions or billions and no singe machine can handle it. Also it is not clear to me how to retrieve urls from responses.

like image 232
Val Avatar asked Oct 30 '25 02:10

Val


1 Answers

If you're using concurrent.futures, you don't need asyncio at all (it will bring you no benefits since you are running in multiple threads anyway). You can use concurrent.futures.wait() to wait for multiple futures in parallel.

I can't test your data, but it should work with code like this:

import concurrent.futures, requests

def get_one(url):
    resp = requests.get(url)
    resp.raise_for_status()
    return resp.text

def get_all():
    with concurrent.futures.ThreadPoolExecutor(max_workers=20) as executor:
        futures = [executor.submit(get_one, url)
                   for url in urls.toLocalIterator()]
    # the end of the "with" block will automatically wait
    # for all of the executor's tasks to complete

    for fut in futures:
        if fut.exception() is not None:
            print('{}: {}'.format(fut.exception(), 'ERR')
        else:
            print('{}: {}'.format(fut.result(), 'OK')

To do the same thing with asyncio, you should use aiohttp instead.

like image 129
user4815162342 Avatar answered Nov 01 '25 18:11

user4815162342