El desarrollo de aplicaciones en tiempo real requiere un enfoque cuidadoso en cuanto a la gestión de conexiones simultáneas. En este contexto, las conexiones WebSocket son fundamentales, ya que permiten una comunicación bidireccional constante entre el servidor y los clientes. Sin embargo, para que las aplicaciones que utilizan WebSockets funcionen de manera eficiente bajo cargas altas y con un alto nivel de seguridad, se deben considerar una serie de factores y técnicas. A continuación, se describe cómo crear un script básico para evaluar el rendimiento de WebSockets, seguido de sugerencias para optimizar y asegurar dichas conexiones.

Para comenzar, necesitamos un script para realizar un benchmarking o prueba de rendimiento. Este script debe realizar tres tareas principales:

  1. Definir una función para ejecutar el servidor FastAPI.

  2. Definir una función para conectar un número determinado de clientes al punto de WebSocket y permitir el intercambio de mensajes.

  3. Ejecutar el servidor en un proceso separado y, simultáneamente, ejecutar los clientes que interactúan con él.

Un ejemplo básico de la primera parte podría ser la siguiente función en Python, que ejecuta el servidor FastAPI:

python
import uvicorn from app.main import app def run_server(): uvicorn.run(app)

En este caso, run_server() es una alternativa a la ejecución del servidor desde la línea de comandos, como estamos acostumbrados a hacer con el comando uvicorn app.main:app. Esto nos permite integrar el inicio del servidor directamente en el script de prueba.

La segunda parte implica la creación de clientes que se conectan al servidor a través de WebSocket y envían mensajes. Utilizando asyncio y websockets, podemos definir una función para crear clientes que se conectan al servidor y simulan el comportamiento de múltiples usuarios. Un ejemplo básico sería:

python
import asyncio from websockets import connect async def connect_client(n: int, n_messages: int = 3): async with connect(f"ws://localhost:8000/chatroom/user{n}") as client: for _ in range(n_messages): await client.send(f"Hello World from user{n}") await asyncio.sleep(n * 0.1) await asyncio.sleep(2)

Aquí, connect_client simula un usuario que se conecta y envía mensajes, esperando un tiempo aleatorio entre los envíos para imitar el comportamiento humano en una conversación.

La tercera parte consiste en ejecutar todo esto en un único ciclo de eventos asincrónicos. Usamos multiprocessing para iniciar el servidor en un proceso separado mientras los clientes se conectan y ejecutan la prueba. El código sería algo así:

python
import multiprocessing async def main(n_clients: int = 10): p = multiprocessing.Process(target=run_server) p.start() await asyncio.sleep(1) connections = [connect_client(n) for n in range(n_clients)] await asyncio.gather(*connections) await asyncio.sleep(1) p.terminate()

El servidor se ejecuta en un proceso independiente, y los clientes se inician de forma simultánea. Posteriormente, se termina el proceso del servidor después de un corto periodo de espera.

El script completo puede ejecutarse fácilmente desde la línea de comandos, y al variar el número de clientes (n_clients), podremos observar cómo el servidor se comporta bajo diferentes cargas. Si superamos el límite de conexiones que el servidor puede manejar, comenzarán a aparecer errores de conexión. Esto indica que hemos alcanzado el umbral de capacidad del servidor.

Además de este script básico de benchmarking, se pueden implementar varias optimizaciones y prácticas recomendadas para mejorar el rendimiento de WebSocket en aplicaciones reales:

  1. Realizar pruebas unitarias para WebSockets utilizando TestClient: FastAPI proporciona una funcionalidad para realizar pruebas de WebSockets con su clase TestClient. Esto es útil para asegurarse de que los puntos de WebSocket funcionan correctamente durante el proceso de desarrollo y que el comportamiento del servidor no cambia inesperadamente.

  2. Manejo adecuado de errores: Es esencial implementar mecanismos para manejar excepciones durante la comunicación de WebSocket. Utilizar bloques try/except para capturar y manejar errores específicos es una práctica recomendada. Además, el uso de async for en lugar de while True al gestionar el intercambio de mensajes permite que los errores de desconexión se manejen de forma automática y eficiente.

  3. Uso de administradores de conexiones (connection pool managers): En aplicaciones que requieren manejar múltiples clientes simultáneamente, como los chats en tiempo real, es recomendable utilizar administradores de conexiones para mejorar el rendimiento y la mantenibilidad del código. Estos permiten gestionar un conjunto de conexiones reutilizables, evitando la creación y destrucción constante de nuevas conexiones.

Por otro lado, la seguridad es otro aspecto crucial que no debe pasarse por alto. En aplicaciones que requieren transmitir información sensible, es indispensable asegurar las conexiones WebSocket para prevenir accesos no autorizados, espionaje o manipulación de datos. Para ello, se puede implementar un mecanismo de autenticación utilizando OAuth2.

Implementando seguridad en conexiones WebSocket con OAuth2

Para proteger un endpoint de WebSocket, debemos integrar un sistema de autenticación basado en tokens, como se hace en las APIs HTTP. A pesar de que, al momento de escribir esto, FastAPI no soporta de forma directa la clase OAuth2PasswordBearer para WebSockets, se puede crear una clase derivada de esta para realizar la misma función. Un ejemplo de implementación de una clase personalizada para manejar la autenticación en WebSocket sería el siguiente:

python
from fastapi import WebSocket, WebSocketException, status from fastapi.security import OAuth2PasswordBearer class OAuth2WebSocketPasswordBearer(OAuth2PasswordBearer): async def __call__(self, websocket: WebSocket) -> str: authorization: str = websocket.headers.get("authorization") if not authorization: raise WebSocketException(code=status.HTTP_401_UNAUTHORIZED, reason="Not authenticated") scheme, param = authorization.split() if scheme.lower() != "bearer": raise WebSocketException(code=status.HTTP_403_FORBIDDEN, reason="Invalid authentication credentials") return param

Esta clase permite verificar el token de acceso en los encabezados de la conexión WebSocket. Posteriormente, se puede utilizar esta clase para recuperar el nombre de usuario asociado con el token y proteger el acceso a puntos específicos de la aplicación.

En resumen, optimizar y asegurar las conexiones WebSocket en aplicaciones FastAPI requiere una planificación adecuada, el uso de herramientas adecuadas para el benchmarking, y la implementación de prácticas de seguridad robustas. Estas medidas no solo mejoran el rendimiento de la aplicación, sino que también garantizan una experiencia de usuario confiable y segura.

¿Cómo integrar FastAPI con LangChain para crear asistentes inteligentes interactivos?

FastAPI es un framework que facilita la construcción de aplicaciones web rápidas y eficientes en Python. Al combinar FastAPI con LangChain, podemos crear aplicaciones inteligentes que interactúan con modelos de lenguaje y proporcionan respuestas dinámicas y contextuales. Esta integración permite que el modelo no solo responda preguntas, sino que también se enriquezca con datos externos y sea capaz de procesar información en tiempo real. En este capítulo, veremos cómo integrar FastAPI con LangChain para crear un asistente inteligente que interactúe con usuarios y proporcione respuestas personalizadas en el contexto de una tienda de productos electrónicos.

El proceso comienza con la configuración básica de FastAPI. Utilizamos un manejador de contexto asíncrono (asynccontextmanager) para establecer el ciclo de vida de la aplicación, que nos permitirá gestionar el estado y los mensajes de la conversación. Esta estructura se incorpora a la aplicación mediante el parámetro lifespan, que asegura que los mensajes del usuario se mantengan durante toda la interacción. La creación del primer endpoint en FastAPI es sencilla. A través de un POST en la ruta /query, podemos recibir las consultas de los usuarios y procesarlas utilizando el modelo de lenguaje. Para evitar respuestas erróneas por consultas vacías, se implementa una restricción de longitud mínima para la consulta.

En aplicaciones reales, como en el caso de un chatbot para una tienda de productos electrónicos, es útil contar con un endpoint adicional que devuelva todo el historial de mensajes intercambiados. De este modo, se mantiene el contexto de la conversación, lo cual mejora la interacción con el asistente. Además, otro endpoint como /restart-conversation puede permitir al usuario reiniciar la conversación sin conservar ningún dato anterior, lo que ofrece una experiencia fresca y limpia.

En cuanto a la integración con LangChain, este se presenta como un marco versátil para trabajar con modelos de lenguaje, facilitando su interacción con diversas fuentes de datos externas. Con LangChain, podemos construir aplicaciones más complejas y contextualizadas al conectar los modelos con documentos, bases de datos o cualquier otro sistema que provea datos adicionales.

El siguiente paso es la creación de un asistente inteligente utilizando LangChain y FastAPI. Para ello, se requiere una clave API de Cohere, que es uno de los proveedores de modelos de lenguaje. Una vez configurada la clave API, se instalan los paquetes necesarios, como langchain, chromadb, y otros componentes que permiten integrar y gestionar los modelos de lenguaje.

LangChain facilita la creación de cadenas de procesamiento que enlazan diversas funciones, como la definición de los prompts, la ingestión de documentos, y la construcción de la cadena del modelo que responde a las consultas del usuario. Los prompts son plantillas de mensajes que definen cómo se deben estructurar las interacciones con el modelo. En nuestro caso, la plantilla inicial se configura con dos mensajes: uno del sistema (donde se describe el rol del asistente) y otro del usuario (donde se proporciona la consulta).

La ingestión de documentos es esencial para mejorar la capacidad de respuesta del asistente. Para ello, los documentos deben ser vectorizados y almacenados de forma eficiente para permitir búsquedas rápidas. En este ejemplo, se utilizan documentos de preguntas frecuentes, los cuales se fragmentan y almacenan en una base de datos de vectores como Chroma DB. Este proceso facilita la recuperación de información relevante en función de la consulta del usuario, optimizando así la precisión y la relevancia de las respuestas del asistente.

Cuando un usuario realiza una consulta, el modelo de lenguaje se alimenta de la pregunta y del contexto proporcionado en los documentos vectorizados, lo que le permite generar una respuesta adecuada. A través de la cadena de LangChain, el flujo de trabajo conecta la consulta con los datos almacenados, lo que posibilita respuestas precisas y contextuales. Esta capacidad es esencial para aplicaciones como asistentes virtuales en comercio electrónico, donde los usuarios requieren respuestas rápidas y precisas sobre productos, disponibilidad, y características técnicas.

Es crucial también tener en cuenta la seguridad y privacidad de los datos del usuario. Aunque este ejemplo se centra en una tienda de productos electrónicos, cualquier aplicación que maneje información sensible debe implementar medidas adecuadas de protección de datos, como el cifrado de las comunicaciones y el almacenamiento seguro de la información. Además, para mejorar la experiencia del usuario, la personalización de las respuestas puede incluir la utilización de históricos de conversaciones previas, adaptando las respuestas a las necesidades y preferencias de cada cliente.

En producción, dependiendo de las necesidades y el presupuesto del proyecto, es importante seleccionar el modelo adecuado. Los modelos de Cohere, como se mencionó, son una opción popular, pero existen diversas alternativas en el mercado que pueden ofrecer distintos beneficios en términos de costo, rendimiento y capacidad de adaptación a diferentes escenarios.

Integrar FastAPI con LangChain no solo permite la creación de asistentes inteligentes, sino que también abre la puerta a una amplia gama de aplicaciones que requieren procesamiento en tiempo real, personalización y manejo de datos complejos. Esta capacidad de integrar modelos de lenguaje con diversas fuentes de información es clave para mejorar la interacción del usuario y proporcionar soluciones más inteligentes en el ámbito digital.

¿Cómo integrar FastAPI con LangChain para construir un asistente AI eficiente?

Integrar FastAPI con LangChain proporciona un entorno robusto para construir aplicaciones interactivas basadas en inteligencia artificial. LangChain es una biblioteca que facilita la creación de aplicaciones de procesamiento de lenguaje natural (NLP) mediante la conexión con modelos de AI, mientras que FastAPI se encarga de gestionar de manera eficiente las peticiones y respuestas a través de la web. A continuación, exploramos los pasos para construir un asistente AI que sea capaz de interactuar con los usuarios de forma efectiva, basado en documentos cargados y gestionados a través de un sistema de base de datos vectorial.

En primer lugar, es necesario crear un módulo llamado documents.py que contenga la función load_documents, la cual se encargará de cargar los archivos en una variable. Para ello, utilizamos DirectoryLoader para cargar los documentos desde una carpeta específica, y el CharacterTextSplitter para dividir el contenido en fragmentos de texto manejables, como se muestra en el siguiente código:

python
from langchain.text_splitter import CharacterTextSplitter from langchain_community.document_loaders import DirectoryLoader from langchain_community.vectorstores import Chroma async def load_documents(db: Chroma): text_splitter = CharacterTextSplitter(chunk_size=100, chunk_overlap=0) raw_documents = DirectoryLoader("docs", "*.txt").load() chunks = text_splitter.split_documents(raw_documents) await db.aadd_documents(chunks)

Aquí, la clase DirectoryLoader se encarga de cargar todo el contenido de los archivos .txt de la carpeta docs. Luego, el objeto text_splitter reorganiza estos documentos en fragmentos de 100 caracteres, que se añaden a la base de datos Chroma. La base de datos vectorizada permite realizar una búsqueda por similitud entre la consulta del usuario y los fragmentos de texto para recuperar el contenido relevante.

Una vez que tenemos los documentos vectorizados, es posible utilizar la función get_context para recuperar el contexto necesario para alimentar al modelo de lenguaje. El siguiente fragmento muestra cómo implementamos esta función:

python
def get_context(user_query: str, db: Chroma) -> str: docs = db.similarity_search(user_query) return "\n\n".join(doc.page_content for doc in docs)

El proceso de vectorización convierte los documentos en representaciones numéricas llamadas embeddings. Al realizar una búsqueda por similitud (usando db.similarity_search), podemos obtener el contenido relevante para luego pasarlo como contexto al modelo de lenguaje.

Con el contexto disponible, podemos proceder a construir el modelo de cadena para interactuar con el asistente AI. El siguiente fragmento de código muestra cómo podemos crear un modelo usando Cohere, una de las herramientas más avanzadas para el procesamiento de lenguaje natural:

python
from dotenv import load_dotenv from langchain_cohere import ChatCohere load_dotenv() # Cargar las variables de entorno model = ChatCohere(model="command-r-plus")

Una vez configurado el modelo, podemos crear una cadena de procesamiento utilizando LangChain para conectar la entrada del usuario con el modelo y la salida esperada. Este proceso se hace a través de la siguiente cadena de procesamiento:

python
from langchain.schema import StrOutputParser from prompting import chat_prompt_template chain = (chat_prompt_template | model | StrOutputParser())

Ahora que tenemos la cadena configurada, el siguiente paso es crear un punto de acceso (endpoint) en la API de FastAPI para interactuar con el asistente AI. Es importante tener en cuenta que la carga de documentos puede ser intensiva en CPU, por lo que se debe definir un gestor de contexto (lifespan) para ejecutar este proceso solo al inicio del servidor. Esto se logra mediante el siguiente código:

python
from contextlib import asynccontextmanager from fastapi import FastAPI from langchain_community.vectorstores import Chroma from documents import load_documents @asynccontextmanager async def lifespan(app: FastAPI): db = Chroma(embedding_function=CohereEmbeddings()) await load_documents(db) yield {"db": db}

Una vez creado el gestor de contexto, podemos pasar la instancia de FastAPI, que se configurará para usar dicho lifespan. Esto garantiza que la base de datos solo se cargue una vez al inicio, evitando sobrecargar el servidor con tareas repetitivas. A continuación, definimos el punto de acceso para manejar las consultas de los usuarios:

python
from fastapi import Body, Request @app.post("/message") async def query_assistant(request: Request, question: str = Body(...)) -> str: context = get_context(question, request.state.db) response = await chain.ainvoke({ "question": question, "context": context }) return response

Este punto de acceso recibirá una consulta del usuario, buscará el contexto relevante en la base de datos y devolverá una respuesta del modelo basado en ese contexto. Para poner en marcha el servidor, basta con ejecutar el siguiente comando:

bash
$ uvicorn main:app

Una vez iniciado el servidor, se puede acceder a la documentación interactiva en http://localhost:8000/docs y probar el endpoint /message para enviar consultas. Por ejemplo, si la consulta es sobre el tipo de pagos aceptados por una empresa, el modelo debería proporcionar una respuesta precisa basada en los documentos cargados.

El proceso de integración de LangChain con FastAPI no solo facilita la creación de un asistente AI eficiente, sino que también ofrece una plataforma escalable para manejar grandes cantidades de datos y consultas complejas. Sin embargo, en aplicaciones reales, es crucial permitir la carga dinámica de nuevos documentos. Para ello, se puede implementar un nuevo endpoint /document que permita añadir archivos a la carpeta de documentos y recargar la base de datos.

Además de la implementación básica, hay aspectos clave que deben ser entendidos para mejorar la efectividad del sistema. Primero, es importante comprender cómo los modelos de lenguaje funcionan con embeddings y por qué una base de datos vectorial como Chroma es útil para almacenar representaciones de texto. También es relevante saber cómo optimizar el rendimiento al cargar documentos grandes, especialmente en un entorno de producción donde las solicitudes concurrentes pueden afectar la capacidad de respuesta del servidor.