We are writing a web service using FastAPI that is going to be hosted in Kubernetes. For auditing purposes, we need to save the raw JSON body of the request
/response
for specific routes. The body size of both request
and response
JSON is about 1MB, and preferably, this should not impact the response time.
How can we do that?
You could use a Middleware
. A middleware
takes each request that comes to your application, and hence, allows you to handle the request
, before it is processed by any endpoint, as well as handle the response
, before it is returned to the client. To create a middleware
, you could use the @app.middleware("http")
decorator on top of a function, as shown in the example below.
As you need to consume the request body from the stream inside the middleware
—using either request.body()
or request.stream()
, as shown in this answer (behind the scenes, the former method actually calls the latter, see here)—then it won't be available when you later pass the request
to the corresponding endpoint. Thus, you can follow the approach described in this post to make the request body available down the line (i.e., using the set_body()
function below). UPDATE: This issue has now been fixed, and hence, there is no need to use that workaround, if you are using FastAPI 0.108.0 or later versions.
As for the response
body, you can use the same approach as described in this answer to consume the body and then return the response
to the client. Either option described in the aforementioned linked answer would work; the below, however, uses Option 2, which stores the body in a bytes object and returns a custom Response
directly (along with the status_code
, headers
and media_type
of the original response
).
To log the data, you could use a BackgroundTask
, as described in this answer, as well as this answer and this answer. A BackgroundTask
will run only once the response has been sent (see Starlette documentation as well); thus, the client won't have to be waiting for the logging to complete before receiving the response
(and hence, the response time won't be noticeably impacted).
If you had a streaming request
or response
with a body that wouldn't fit into your server's RAM (for example, imagine a body of 100GB on a machine running 8GB RAM), it would become problematic, as you are storing the data to RAM, which wouldn't have enough space available to accommodate the accumulated data. Also, in case of a large response
(e.g., a large FileResponse
or StreamingResponse
), you may be faced with Timeout
errors on client side (or on reverse proxy side, if you are using one), as you would not be able to respond back to the client, until you have read the entire response body (as you are looping over response.body_iterator
). You mentioned that "the body size of both request and response JSON is about 1MB"; hence, that should normally be fine (however, it is always a good practice to consider beforehand matters, such as how many requests your API is expected to be serving concurrently, what other applications might be using the RAM, etc., in order to rule whether this is an issue or not). If you needed to, you could limit the number of requests to your API endpoints using, for example, SlowAPI (as shown in this answer).
middleware
to specific routes onlyYou could limit the usage of the middleware
to specific endpoints by:
request.url.path
inside the middleware against a
pre-defined list of routes for which you would like to log the
request
and response
, as described in this answer (see the
"Update" section),APIRoute
class, as demonstrated in Option 2
below.from fastapi import FastAPI, APIRouter, Response, Request
from starlette.background import BackgroundTask
from fastapi.routing import APIRoute
from starlette.types import Message
from typing import Dict, Any
import logging
app = FastAPI()
logging.basicConfig(filename='info.log', level=logging.DEBUG)
def log_info(req_body, res_body):
logging.info(req_body)
logging.info(res_body)
# not needed when using FastAPI>=0.108.0.
'''
async def set_body(request: Request, body: bytes):
async def receive() -> Message:
return {'type': 'http.request', 'body': body}
request._receive = receive
'''
@app.middleware('http')
async def some_middleware(request: Request, call_next):
req_body = await request.body()
#await set_body(request, req_body) # not needed when using FastAPI>=0.108.0.
response = await call_next(request)
chunks = []
async for chunk in response.body_iterator:
chunks.append(chunk)
res_body = b''.join(chunks)
task = BackgroundTask(log_info, req_body, res_body)
return Response(content=res_body, status_code=response.status_code,
headers=dict(response.headers), media_type=response.media_type, background=task)
@app.post('/')
def main(payload: Dict[Any, Any]):
return payload
In case you would like to perform some validation on the request body—for example, ensruing that the request body size is not exceeding a certain value—instead of using request.body()
, you can process the body one chunk at a time using the .stream()
method, as shown below (similar to this answer).
@app.middleware('http')
async def some_middleware(request: Request, call_next):
chunks = []
async for chunk in request.stream():
chunks.append(chunk)
req_body = b''.join(chunks)
...
APIRoute
classYou can alternatively use a custom APIRoute
class—similar to here and here—which, among other things, would allow you to manipulate the request
body before it is processed by your application, as well as the response
body before it is returned to the client. This option also allows you to limit the usage of this class to the routes you wish, as only the endpoints under the APIRouter
(i.e., router
in the example below) will use the custom APIRoute
class .
It should be noted that the same comments mentioned in Option 1 above, under the "Note" section, apply to this option as well. For example, if your API returns a StreamingResponse
—such as in /video
route of the example below, which is streaming a video file from an online source (public videos to test this can be found here, and you can even use a longer video than the one used below to see the effect more clearly)—you may come across issues on server side, if your server's RAM can't handle it, as well as delays on client side (and reverse proxy server, if using one) due to the whole (streaming) response being read and stored in RAM, before it is returned to the client (as explained earlier). In such cases, you could exclude such endpoints that return a StreamingResponse
from the custom APIRoute
class and limit its usage only to the desired routes—especially, if it is a large video file, or even live video that wouldn't likely make much sense to have it stored in the logs—simply by not using the @<name_of_router>
decorator (i.e., @router
in the example below) for such endpoints, but rather using the @<name_of_app>
decorator (i.e., @app
in the example below), or some other APIRouter
or sub application.
from fastapi import FastAPI, APIRouter, Response, Request
from starlette.background import BackgroundTask
from starlette.responses import StreamingResponse
from fastapi.routing import APIRoute
from starlette.types import Message
from typing import Callable, Dict, Any
import logging
import httpx
def log_info(req_body, res_body):
logging.info(req_body)
logging.info(res_body)
class LoggingRoute(APIRoute):
def get_route_handler(self) -> Callable:
original_route_handler = super().get_route_handler()
async def custom_route_handler(request: Request) -> Response:
req_body = await request.body()
response = await original_route_handler(request)
tasks = response.background
if isinstance(response, StreamingResponse):
chunks = []
async for chunk in response.body_iterator:
chunks.append(chunk)
res_body = b''.join(chunks)
task = BackgroundTask(log_info, req_body, res_body)
response = Response(content=res_body, status_code=response.status_code,
headers=dict(response.headers), media_type=response.media_type)
else:
task = BackgroundTask(log_info, req_body, response.body)
# check if the original response had background tasks already attached to it
if tasks:
tasks.add_task(task) # add the new task to the tasks list
response.background = tasks
else:
response.background = task
return response
return custom_route_handler
app = FastAPI()
router = APIRouter(route_class=LoggingRoute)
logging.basicConfig(filename='info.log', level=logging.DEBUG)
@router.post('/')
def main(payload: Dict[Any, Any]):
return payload
@router.get('/video')
def get_video():
url = 'https://storage.googleapis.com/gtv-videos-bucket/sample/ForBiggerBlazes.mp4'
def gen():
with httpx.stream('GET', url) as r:
for chunk in r.iter_raw():
yield chunk
return StreamingResponse(gen(), media_type='video/mp4')
app.include_router(router)
You may try to customize APIRouter like in FastAPI official documentation:
import time
from typing import Callable
from fastapi import APIRouter, FastAPI, Request, Response
from fastapi.routing import APIRoute
class TimedRoute(APIRoute):
def get_route_handler(self) -> Callable:
original_route_handler = super().get_route_handler()
async def custom_route_handler(request: Request) -> Response:
before = time.time()
response: Response = await original_route_handler(request)
duration = time.time() - before
response.headers["X-Response-Time"] = str(duration)
print(f"route duration: {duration}")
print(f"route response: {response}")
print(f"route response headers: {response.headers}")
return response
return custom_route_handler
app = FastAPI()
router = APIRouter(route_class=TimedRoute)
@app.get("/")
async def not_timed():
return {"message": "Not timed"}
@router.get("/timed")
async def timed():
return {"message": "It's the time of my life"}
app.include_router(router)
If you love us? You can donate to us via Paypal or buy me a coffee so we can maintain and grow! Thank you!
Donate Us With