W programowaniu współczesnych aplikacji internetowych, szczególnie tych, które wymagają przetwarzania dużych ilości danych, niezwykle istotne staje się wykorzystanie asynchroniczności. W FastAPI, dzięki wbudowanej obsłudze asynchronicznych operacji, możemy osiągnąć znaczną poprawę wydajności, zwłaszcza w kontekście operacji I/O-bound, takich jak zapytania do baz danych, odczytywanie plików czy wywołania sieciowe. Celem tego rozdziału jest omówienie, jak za pomocą prostych funkcji asynchronicznych, takich jak asyncio, zarządzać równoczesnymi operacjami w FastAPI, aby poprawić responsywność aplikacji i zapewnić lepsze wykorzystanie zasobów serwera.

Pierwszym krokiem w budowie asynchronicznej aplikacji jest zdefiniowanie kontekstu uruchamiania serwera w osobnym procesie. Dla zapewnienia, że serwer uruchomi się w odpowiednim czasie, używamy menedżera kontekstu, który odpala serwer w osobnym procesie, a po chwili pozwala na wykonanie dalszych operacji, takich jak wysyłanie żądań HTTP. Przykładowa implementacja może wyglądać następująco:

python
from contextlib import contextmanager from multiprocessing import Process import time @contextmanager def run_server_in_process(): p = Process(target=run_server) p.start() time.sleep(2) # Czas na uruchomienie serwera print("Serwer działa w osobnym procesie") yield p.terminate()

Zdefiniowany w ten sposób kontekst pozwala na uruchomienie serwera w osobnym procesie, dając mu kilka sekund na start, zanim przejdziemy do dalszych operacji.

Kolejnym krokiem jest przygotowanie funkcji, która wykonuje równoczesne zapytania do określonego punktu końcowego. W tym przypadku używamy klienta asynchronicznego do wykonywania zapytań GET, które są realizowane równolegle. Funkcja ta, przyjmująca liczbę zapytań n oraz ścieżkę path, może wyglądać następująco:

python
from httpx import AsyncClient import asyncio
async def make_requests_to_the_endpoint(n: int, path: str):
async with AsyncClient(base_url="http://localhost:8000") as client: tasks = ( client.get(path, timeout=float("inf")) for _ in range(n) ) await asyncio.gather(*tasks)

Za pomocą tej funkcji, będącej podstawą asynchronicznych wywołań, możemy wykonać wiele równoczesnych zapytań do wybranego punktu końcowego w aplikacji FastAPI, co w sposób znaczący przyspiesza czas oczekiwania na odpowiedź w porównaniu do tradycyjnego przetwarzania synchronicznego.

Zgromadzenie wszystkich operacji w jednej funkcji głównej pozwala na prostsze zarządzanie czasem wykonania wszystkich zapytań. Na przykład, w poniższym kodzie używamy wcześniej zdefiniowanego menedżera kontekstu do uruchomienia serwera, wykonania równoczesnych zapytań do dwóch różnych punktów końcowych (synchronizowanego i asynchronicznego) i zmierzenia czasu, jaki upłynął od rozpoczęcia do zakończenia operacji:

python
import time
async def main(n: int = 10): with run_server_in_process(): begin = time.time() await make_requests_to_the_endpoint(n, "/sync") end = time.time()
print(f"Czas wykonania {n} zapytań do punktu /sync: {end - begin} sekund")
begin = time.time()
await make_requests_to_the_endpoint(n, "/async") end = time.time() print(f"Czas wykonania {n} zapytań do punktu /async: {end - begin} sekund")

Warto zauważyć, że w przypadku wykonania zapytań równoczesnych (przy użyciu asyncio), czas odpowiedzi w przypadku zapytań do punktu /async będzie zauważalnie krótszy niż w przypadku zapytań do punktu /sync. Jednak, aby uzyskać większą liczbę zapytań, na przykład 100, wynik może wskazać na większą różnicę czasową pomiędzy tymi dwoma podejściami.

Asynchroniczność znacząco zwiększa wydajność przy wywołaniach, które są obarczone dużym czasem oczekiwania, jak zapytania do baz danych czy zewnętrznych usług. Dzięki jej zastosowaniu serwer może obsługiwać więcej zapytań w tym samym czasie, co sprawia, że aplikacja staje się bardziej skalowalna.

Warto jednak pamiętać, że asynchroniczność nie zawsze jest rozwiązaniem, które poprawia wydajność w każdym przypadku. Zdecydowanie najlepiej sprawdza się w kontekście operacji I/O-bound. W przypadku operacji CPU-bound, takich jak intensywne obliczenia, asynchroniczność może nie przynieść oczekiwanego rezultatu. Tego rodzaju operacje wciąż mogą wymagać tradycyjnego podejścia wielowątkowego lub wieloprocesowego.

Asynchroniczność wiąże się także z koniecznością rozważenia takich kwestii jak zarządzanie transakcjami w bazach danych. W przypadku baz danych, zarówno SQL, jak i NoSQL, należy stosować odpowiednie podejście do zarządzania sesjami i transakcjami, aby zapewnić integralność danych. W FastAPI do asynchronicznej obsługi baz danych wykorzystuje się bibliotekę sqlalchemy[asyncio] dla SQL oraz motor dla MongoDB. Ważne jest również, by kod obsługi błędów był dostosowany do asynchronicznych operacji, ponieważ trudniej jest kontrolować błędy w przypadku wielu równoczesnych zadań. Z tego względu warto zaopatrzyć się w odpowiednie mechanizmy obsługi wyjątków i testować asynchroniczny kod za pomocą narzędzi wspierających takie testy.

Ostatecznie, programowanie asynchroniczne pozwala na budowanie aplikacji internetowych o wysokiej wydajności i elastyczności. Kluczowe jest jednak, by w pełni rozumieć, kiedy i jak stosować tę technologię. Nie każda aplikacja czy operacja wymaga asynchroniczności, a jej niewłaściwe zastosowanie może prowadzić do złożoności w kodzie i trudności w utrzymaniu aplikacji.

Jak efektywnie zarządzać danymi w aplikacjach FastAPI z wykorzystaniem Redis i Elasticsearch?

Współczesne aplikacje webowe muszą być nie tylko funkcjonalne, ale także wydajne i szybkie w działaniu. Kluczowym aspektem osiągnięcia tych celów jest optymalizacja procesów, w tym zarządzanie pamięcią podręczną (cache). Jednym z najczęściej wykorzystywanych narzędzi w tym zakresie jest Redis, który umożliwia przechowywanie danych w pamięci i znaczne przyspieszenie dostępu do nich, szczególnie w przypadkach, gdy dane są często używane, ale rzadko się zmieniają.

Przykładem zastosowania Redis w aplikacjach opartych na FastAPI może być implementacja cache'u dla danych o popularnych artystach w danym kraju. W tym przypadku celem jest zminimalizowanie liczby zapytań do Elasticsearch oraz skrócenie czasu odpowiedzi API poprzez przechowywanie wyników w Redis. Dzięki temu, po pierwszym zapytaniu o dane, aplikacja szybko zwróci wyniki z pamięci podręcznej, a dopiero po wygaśnięciu danych w cache'u, wykonane zostanie zapytanie do Elasticsearch, które w tym przypadku jest bazą danych NoSQL.

Początkowo, w funkcji odpowiedzialnej za pobieranie danych, sprawdzamy, czy dane dla danego kraju są już przechowywane w Redis. Jeśli tak, zwracamy je z pamięci podręcznej, co znacząco skraca czas odpowiedzi:

python
cache_key = f"top_ten_artists_{country}" cached_data = await redis_client.get(cache_key) if cached_data: logger.info(f"Returning cached data for {country}") return json.loads(cached_data)

Jeśli dane nie są dostępne w cache'u, przechodzimy do zapytania Elasticsearch, które pozwala na uzyskanie najnowszych informacji:

python
try:
response = await es_client.search(*top_ten_artists_query(country)) except BadRequestError as e: logger.error(e) raise HTTPException(status_code=400, detail="Invalid country") artists = [{"artist": record.get("key"), "views": record.get("views", {}).get("value")} for record in response["aggregations"]["top_ten_artists"]["buckets"]]

Po otrzymaniu wyników z Elasticsearch, zapisujemy je w Redis, aby przyspieszyć dostęp do nich przy kolejnych zapytaniach. Określamy również czas życia danych w pamięci podręcznej, po którym dane zostaną automatycznie usunięte:

python
await redis_client.set(cache_key, json.dumps(artists), ex=3600)

W ten sposób dane będą dostępne w pamięci przez godzinę, a po jej upływie użytkownik ponownie otrzyma dane z Elasticsearch. Dzięki zastosowaniu Redis, czas odpowiedzi na kolejne zapytania będzie znacznie krótszy, co zwiększy wydajność aplikacji, szczególnie w przypadku częstych zapytań o te same dane.

Pomimo tego, że ta implementacja bazuje na Redisie, warto wspomnieć, że istnieją także inne rozwiązania umożliwiające łatwiejsze wdrożenie cache'u w aplikacjach FastAPI. Przykładem może być biblioteka fastapi-cache, która pozwala na prostą konfigurację cache'u przy użyciu dekoratorów, a także obsługuje różne typy baz danych cache’ujących, w tym Redis. Dla zaawansowanych użytkowników Redis, warto zapoznać się z bardziej zaawansowanymi funkcjami klienta Redis dla Pythona, które pozwalają na precyzyjniejsze kontrolowanie cache’u, takich jak możliwość ustawiania różnych typów danych (np. listy, zestawy, hasze) oraz operacje na nich.

Warto również zwrócić uwagę na dodatkowe zagadnienia związane z przechowywaniem danych w Redisie. Przy wdrożeniu cache'u należy uwzględnić takie aspekty, jak czas życia danych (TTL) oraz zarządzanie pamięcią. Redis, mimo że jest bardzo szybki i efektywny, ma swoje ograniczenia, zwłaszcza w przypadku dużych zbiorów danych, które mogą wyczerpać dostępne zasoby pamięci. Dlatego przy większych aplikacjach warto zastanowić się nad stosowaniem rozwiązań, które zapewnią optymalizację pamięci i wydajności, takich jak strategie wymiany danych czy podział danych na różne segmenty.

Po zaimplementowaniu systemu cache'owania, warto pamiętać, że jego stosowanie nie wyklucza konieczności dalszej optymalizacji zapytań do baz danych, takich jak Elasticsearch. Chociaż cache może przyspieszyć czas odpowiedzi, to zapytania do samej bazy danych wciąż mogą być czasochłonne, szczególnie w przypadku bardzo dużych zbiorów danych. W takich przypadkach warto rozważyć dodatkowe techniki optymalizacji, takie jak agregacja danych w bazie danych, indeksowanie oraz inne zaawansowane strategie poprawiające wydajność zapytań.

Jak zaimplementować middleware i webhooki w FastAPI: Praktyczne podejście

Współczesne aplikacje webowe wymagają efektywnej komunikacji między różnymi systemami, a jedną z technologii umożliwiających tę interakcję są webhooki. Webhooki pozwalają na automatyczne przesyłanie danych z jednego systemu do drugiego w odpowiedzi na konkretne zdarzenia. Dzięki nim aplikacje mogą reagować na różne sytuacje w czasie rzeczywistym, co ma kluczowe znaczenie w nowoczesnym rozwoju oprogramowania.

FastAPI, jako nowoczesny framework do tworzenia API w Pythonie, oferuje szerokie możliwości integracji z różnymi systemami poprzez middleware i webhooki. Middleware to mechanizm umożliwiający przechwytywanie żądań HTTP i modyfikowanie ich, co pozwala na implementację funkcji takich jak weryfikacja hosta czy logowanie zdarzeń. Z kolei webhooki to rodzaj asynchronicznych wywołań, które umożliwiają powiadamianie innych systemów o zdarzeniach w naszej aplikacji.

Middleware do weryfikacji hostów

Pierwszym krokiem, który warto omówić, jest implementacja middleware, które pozwala na weryfikację poprawności hosta wysyłającego zapytanie do naszej aplikacji. FastAPI udostępnia wbudowaną klasę TrustedHostMiddleware, której zadaniem jest blokowanie zapytań od niepożądanych hostów. W tym przypadku, po dodaniu odpowiedniego middleware, aplikacja będzie akceptować zapytania jedynie z zaufanych hostów, co zwiększa bezpieczeństwo aplikacji.

Aby wprowadzić takie middleware, wystarczy dodać kod do aplikacji FastAPI, jak poniżej:

python
from fastapi.middleware.trustedhost import TrustedHostMiddleware
app.add_middleware( TrustedHostMiddleware, allowed_hosts=["localhost"], )

W praktyce oznacza to, że aplikacja nie zaakceptuje żadnych żądań, które nie będą pochodziły z hosta localhost. Można również skonfigurować aplikację, aby przyjęła więcej hostów, zależnie od potrzeb.

Aby przetestować działanie tego middleware, wystarczy uruchomić serwer aplikacji FastAPI, pozwalając jej być widocznej w sieci lokalnej, używając poniższej komendy:

bash
$ uvicorn main:app --host=0.0.0.0

Teraz aplikacja będzie dostępna w sieci lokalnej, jednak każde zapytanie spoza zaufanych hostów zostanie odrzucone, a użytkownik zobaczy komunikat o błędzie.

Implementacja webhooków

Webhooki stanowią kolejną kluczową funkcjonalność w budowie nowoczesnych aplikacji. Zwykle są to asynchroniczne powiadomienia, które wysyłają aplikacje do innych systemów, kiedy zachodzi określone zdarzenie. Dzięki nim systemy mogą wymieniać się informacjami w czasie rzeczywistym, co jest szczególnie przydatne w aplikacjach opartych na mikroserwisach, integracjach z zewnętrznymi usługami czy w systemach monitorowania.

Aby stworzyć webhook w FastAPI, najpierw musimy stworzyć system rejestracji URL-i, na które będą wysyłane powiadomienia. W tym celu możemy utworzyć specjalny endpoint, który będzie przechowywał zarejestrowane URL-e w stanie aplikacji, jak w poniższym przykładzie:

python
from fastapi import FastAPI, Body, Request app = FastAPI() @app.post("/register-webhook-url")
async def add_webhook_url(request: Request, url: str = Body()):
if not url.startswith("http"): url = f"http://{url}" request.state.webhook_urls.append(url) return {"url added": url}

Każdy nowy URL, który nie zaczyna się od http, zostanie automatycznie uzupełniony o protokół http://, a następnie zapisany w stanie aplikacji. Tego typu system może być wystarczający dla prostych aplikacji, choć w bardziej rozbudowanych projektach warto przechowywać te dane w bazie danych.

Kolejnym krokiem jest implementacja wywołań webhooków, czyli wysyłanie informacji do zarejestrowanych URL-i za każdym razem, gdy wystąpi określone zdarzenie w naszej aplikacji. Można to zrobić przy użyciu middleware, które będzie monitorować każde zapytanie do API, a następnie przesyłać odpowiednie informacje do zarejestrowanych URL-i. Oto jak to można zaimplementować:

python
from pydantic import BaseModel from httpx import AsyncClient import logging from asyncio import create_task from datetime import datetime from fastapi import Request from starlette.types import ASGIApp, Receive, Scope, Send client = AsyncClient() logger = logging.getLogger("uvicorn") class Event(BaseModel): host: str path: str time: str body: str | None = None
async def send_event_to_url(url: str, event: Event):
logger.info(
f"Sending event to {url}") try: await client.post(f"{url}/fastapi-webhook", json=event.dict()) except Exception as e: logger.error(f"Error sending webhook event to {url}: {e}") class WebhookSenderMiddleware: def __init__(self, app: ASGIApp): self.app = app
async def __call__(self, scope: Scope, receive: Receive, send: Send):
if scope["type"] == "http": message = await receive() body = message.get("body", b"") request = Request(scope=scope) event = Event( host=request.client.host, path=request.url.path, time=datetime.now().isoformat(), body=body, ) urls = request.state.webhook_urls for url in urls: await create_task(send_event_to_url(url, event)) await self.app(scope, receive, send)

W powyższym kodzie wykorzystujemy asynchroniczne wywołanie HTTP, aby wysłać dane do wszystkich zarejestrowanych URL-i. Middleware przechwytuje każde żądanie HTTP, tworzy obiekt Event zawierający informacje o żądaniu (takie jak adres hosta, ścieżka URL, czas oraz treść ciała zapytania), a następnie wysyła te dane do wszystkich zarejestrowanych URL-i.

Dodatkowe uwagi

Podczas implementacji webhooków należy pamiętać, że komunikacja między systemami musi odbywać się w sposób asynchroniczny, ponieważ oczekiwanie na odpowiedzi z zewnętrznych serwisów może znacznie wydłużyć czas odpowiedzi aplikacji. FastAPI, dzięki swojej architekturze asynchronicznej, doskonale nadaje się do takich zadań.

Warto również pamiętać o zabezpieczeniu webhooków przed nieautoryzowanym dostępem. Często stosuje się metody uwierzytelniania, takie jak tokeny API, które zapewniają, że tylko upoważnione systemy mogą rejestrować URL-e lub otrzymywać powiadomienia.