I have two different functions (funcA and funcB) that I want to be executed concurrently to cut down on overall execution time.
funcA is an API call that takes somewhere between 5 to 7 seconds.
funcB is a CPU intensive operation that uses ML algorithms and takes somewhere between 7 to 10 seconds.
A simple dummy version of funcA and funcB would be:
def funcA(inpA):
res1 = apiCall1(inpA)
res2 = apiCall2(res1)
if res2['requireAPICall3']:
res3 = apiCall3(res2)
return res3
else:
return res2
def funcB(inpB):
modelPath = downloadModel()
model = loadModel(modelPath)
prediction = predict(inpB)
return prediction
This will all be done on a python server.
The request will contain two parts, let's say inpA and inpB. inpA will be passed to funcA and inpB will be passed to funcB.
Currently the overall execution is as follows:
def processRequest(request):
response = {}
response['outA'] = funcA(request['inpA'])
response['outB'] = funcB(request['inpB'])
return response
So, the overall execution takes somewhere between 12 to 17 seconds. But considering both the functions take separate input and are not dependent on each other if they could be executed simultaneously, the overall request would take much less time (7 to 10 seconds for this example).
Every function being used is synchronous. I can make funcA async but making funcB async is not an option right now. Maybe it can be wrapped in an async function. I'm not totally sure about asynchronous programming in Python so any help would be greatly appreciated. Maybe multithreading can be an option as well.
I tried using asyncio library and it's gather function. But for some reason, it worked the same as the processRequest function described above.
Below is the code I tried using asyncio:
async def funcA(inpA):
return apiCall(inpA)
async def funcB(inpB):
return predict(inpB)
async def processRequest(req):
t1 = asyncio.create_task(funcA(req['inpA']))
t2 = asyncio.create_task(funcB(req['inpB']))
return await asyncio.gather(t1, t2)
I updated the apiCall function as below to test Anentropic's Answer.
async def apiCall(inpA):
res1 = await apiCall1(inpA)
res2 = await apiCall2(res1)
if res2['requireAPICall3']:
res3 = await apiCall3(res2)
return res3
else:
return res2
You can use ProcessPoolExecutor from the concurrent.futures module of Python.
I wrote a simple demo for your problem:
from concurrent.futures import ProcessPoolExecutor
import time
from time import sleep
def funcA(x):
sleep(7) # simulate a long running task
return x * x
def funcB(y):
sleep(10) # simulate a long running task
return y + y
# the global executor
executor = ProcessPoolExecutor(max_workers=2)
def main(x, y):
res = []
start = time.time()
# submit the tasks
task_A = executor.submit(funcA, x)
task_B = executor.submit(funcB, y)
# wait for the tasks to complete
res.append(task_A.result())
res.append(task_B.result())
end = time.time()
print("Time taken: {}".format(end - start))
return res
if __name__ == '__main__':
print(main(2, 3))
Output:
Time taken: 10.005340814590454
[4, 6]
It took ~10s to finish a 7s Task-A and a 10s Task-B.
I prefer ProcessPoolExecutor over ThreadPoolExecutor because:
ProcessPoolExecutoruses themultiprocessingmodule, which allows it to side-step the Global Interpreter Lock but also means that only picklable objects can be executed and returned.
As long as the code in apiCall is fully async (i.e. you are using an async lib to make the API requests such as https://www.python-httpx.org/async/) then this pattern should work:
import asyncio
import time
async def api_call():
await asyncio.sleep(3)
return "A"
async def funcA():
return await api_call()
async def funcB():
time.sleep(3)
return "B"
async def process():
start = time.time()
t1, t2 = await asyncio.gather(funcA(), funcB())
end = time.time()
print(f"elapsed: {end - start}s")
return t1, t2
result = asyncio.run(process())
print(result)
For purposes of a demo I have used time.sleep in funcB as a non-async (blocking) call, representing your CPU-intensive operation, and asyncio.sleep in funcA as a non-blocking call representing your API request.
If you run this code you should see 3s elapsed instead of 6s.
If you substitute time.sleep for the asyncio.sleep you will find that it takes 6s, indicating that any blocking code in the async functions will obstruct your concurrency.
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With