Let’s build a simple AI agent in Python to illustrate the concept.
Accept user input
Plan steps to reach the goal
Execute actions
Learn or adapt slightly
How we are going to make it happen, let's break it down in small pices.
A Python AI agent that:
-
Takes your inputs or question.
-
Uses a local open-source LLM (like Llama 2, Mistral 7B, Phi-2, or TinyLlama 1.1B).
-
Plans steps or answers questions.
-
Optionally stores knowledge.
Technologies:
-
Python
-
Ollama or LM Studio or Local Hugging Face Transformers
-
FastAPI (optional, if you want a web API)
-
LangChain (optional, for easy LLM chaining)
Overview
1. Client uploads PDF/image OR asks a question → FastAPI generates a request_id → enqueues an ARQ job with that ID.
2. ARQ worker:
-
Processes PDF/image → stores text → updates vector DB.
-
Stores result in Redis with request_id as key.
3. When done:
-
The worker optionally pushes the result via WebSocket if the client is connected.
-
Or client can poll /result/{request_id} to get the status/result.
Requirements:
pip install fastapi uvicorn arq redis websockets langchain-community chromadb openai PyMuPDF pytesseract pillow
Redis must be running:
Either you can install redis server on local or can use docker to run it.
docker run -p 6379:6379 redis
ARQ Worker (worker.py)
import io
import uuid
from arq.connections import RedisSettings
import fitz
import pytesseract
from PIL import Image
import redis.asyncio as aioredis
from langchain_community.embeddings import OllamaEmbeddings
from langchain_community.vectorstores import Chroma
from langchain_community.llms import Ollama
from langchain.chains import ConversationalRetrievalChain
# Redis keys
RESULT_KEY = lambda rid: f"result:{rid}"
# Embeddings & vectorstore
embeddings = OllamaEmbeddings(model="tinyllama")
vectorstore = Chroma(persist_directory="./vectorstore", embedding_function=embeddings)
llm = Ollama(model="tinyllama")
async def process_pdf(ctx, file_bytes: bytes, request_id: str):
pdf_doc = fitz.open(stream=file_bytes, filetype="pdf")
text = "\n".join([page.get_text() for page in pdf_doc])
vectorstore.add_texts([text], ids=[request_id])
redis = await aioredis.from_url("redis://localhost")
await redis.set(RESULT_KEY(request_id), f"PDF processed, chars={len(text)}")
await redis.close()
async def process_image(ctx, file_bytes: bytes, request_id: str):
image = Image.open(io.BytesIO(file_bytes))
text = pytesseract.image_to_string(image)
vectorstore.add_texts([text], ids=[request_id])
redis = await aioredis.from_url("redis://localhost")
await redis.set(RESULT_KEY(request_id), f"Image processed, chars={len(text)}")
await redis.close()
async def handle_question(ctx, question: str, request_id: str):
retriever = vectorstore.as_retriever()
qa_chain = ConversationalRetrievalChain.from_llm(llm, retriever=retriever)
answer = qa_chain.run(question)
redis = await aioredis.from_url("redis://localhost")
await redis.set(RESULT_KEY(request_id), answer)
await redis.close()
class WorkerSettings:
functions = [process_pdf, process_image, handle_question]
redis_settings = RedisSettings()
import io
import uuid
from arq.connections import RedisSettings
import fitz
import pytesseract
from PIL import Image
import redis.asyncio as aioredis
from langchain_community.embeddings import OllamaEmbeddings
from langchain_community.vectorstores import Chroma
from langchain_community.llms import Ollama
from langchain.chains import ConversationalRetrievalChain
# Redis keys
RESULT_KEY = lambda rid: f"result:{rid}"
# Embeddings & vectorstore
embeddings = OllamaEmbeddings(model="tinyllama")
vectorstore = Chroma(persist_directory="./vectorstore", embedding_function=embeddings)
llm = Ollama(model="tinyllama")
async def process_pdf(ctx, file_bytes: bytes, request_id: str):
pdf_doc = fitz.open(stream=file_bytes, filetype="pdf")
text = "\n".join([page.get_text() for page in pdf_doc])
vectorstore.add_texts([text], ids=[request_id])
redis = await aioredis.from_url("redis://localhost")
await redis.set(RESULT_KEY(request_id), f"PDF processed, chars={len(text)}")
await redis.close()
async def process_image(ctx, file_bytes: bytes, request_id: str):
image = Image.open(io.BytesIO(file_bytes))
text = pytesseract.image_to_string(image)
vectorstore.add_texts([text], ids=[request_id])
redis = await aioredis.from_url("redis://localhost")
await redis.set(RESULT_KEY(request_id), f"Image processed, chars={len(text)}")
await redis.close()
async def handle_question(ctx, question: str, request_id: str):
retriever = vectorstore.as_retriever()
qa_chain = ConversationalRetrievalChain.from_llm(llm, retriever=retriever)
answer = qa_chain.run(question)
redis = await aioredis.from_url("redis://localhost")
await redis.set(RESULT_KEY(request_id), answer)
await redis.close()
class WorkerSettings:
functions = [process_pdf, process_image, handle_question]
redis_settings = RedisSettings()
FastAPI with request ID & result polling (main.py)
import uuid
from fastapi import FastAPI, UploadFile, WebSocket
from pydantic import BaseModel
from arq.connections import create_pool
import redis.asyncio as aioredis
app = FastAPI()
# Connected WebSocket clients
clients = {}
@app.on_event("startup")
async def startup():
app.state.arq = await create_pool()
app.state.redis = await aioredis.from_url("redis://localhost")
@app.on_event("shutdown")
async def shutdown():
await app.state.redis.close()
@app.post("/upload/pdf")
async def upload_pdf(file: UploadFile):
file_bytes = await file.read()
request_id = str(uuid.uuid4())
await app.state.arq.enqueue_job("process_pdf", file_bytes, request_id)
return {"status": "PDF queued", "request_id": request_id}
@app.post("/upload/image")
async def upload_image(file: UploadFile):
file_bytes = await file.read()
request_id = str(uuid.uuid4())
await app.state.arq.enqueue_job("process_image", file_bytes, request_id)
return {"status": "Image queued", "request_id": request_id}
class Question(BaseModel):
question: str
@app.post("/ask")
async def ask(question: Question):
request_id = str(uuid.uuid4())
await app.state.arq.enqueue_job("handle_question", question.question, request_id)
return {"status": "Question queued", "request_id": request_id}
@app.get("/result/{request_id}")
async def get_result(request_id: str):
result = await app.state.redis.get(f"result:{request_id}")
if result:
return {"request_id": request_id, "result": result.decode()}
return {"request_id": request_id, "status": "Processing"}
@app.websocket("/ws/{client_id}")
async def websocket_endpoint(websocket: WebSocket, client_id: str):
await websocket.accept()
clients[client_id] = websocket
try:
while True:
await websocket.receive_text()
except:
clients.pop(client_id, None)
@app.get("/")
def root():
return {"status": "Multi-modal ARQ agent with request ID & polling"}
import uuid
from fastapi import FastAPI, UploadFile, WebSocket
from pydantic import BaseModel
from arq.connections import create_pool
import redis.asyncio as aioredis
app = FastAPI()
# Connected WebSocket clients
clients = {}
@app.on_event("startup")
async def startup():
app.state.arq = await create_pool()
app.state.redis = await aioredis.from_url("redis://localhost")
@app.on_event("shutdown")
async def shutdown():
await app.state.redis.close()
@app.post("/upload/pdf")
async def upload_pdf(file: UploadFile):
file_bytes = await file.read()
request_id = str(uuid.uuid4())
await app.state.arq.enqueue_job("process_pdf", file_bytes, request_id)
return {"status": "PDF queued", "request_id": request_id}
@app.post("/upload/image")
async def upload_image(file: UploadFile):
file_bytes = await file.read()
request_id = str(uuid.uuid4())
await app.state.arq.enqueue_job("process_image", file_bytes, request_id)
return {"status": "Image queued", "request_id": request_id}
class Question(BaseModel):
question: str
@app.post("/ask")
async def ask(question: Question):
request_id = str(uuid.uuid4())
await app.state.arq.enqueue_job("handle_question", question.question, request_id)
return {"status": "Question queued", "request_id": request_id}
@app.get("/result/{request_id}")
async def get_result(request_id: str):
result = await app.state.redis.get(f"result:{request_id}")
if result:
return {"request_id": request_id, "result": result.decode()}
return {"request_id": request_id, "status": "Processing"}
@app.websocket("/ws/{client_id}")
async def websocket_endpoint(websocket: WebSocket, client_id: str):
await websocket.accept()
clients[client_id] = websocket
try:
while True:
await websocket.receive_text()
except:
clients.pop(client_id, None)
@app.get("/")
def root():
return {"status": "Multi-modal ARQ agent with request ID & polling"}
Run everything
uvicorn main:app --reload
arq worker.WorkerSettings
Here are example curl commands for each API endpoint in AI processing system.
Upload a PDF:
curl -X POST http://localhost:8000/upload/pdf
-F "file=@/path/to/your/file.pdf"
curl -X POST http://localhost:8000/upload/pdf
-F "file=@/path/to/your/file.pdf"
Upload an Image:
curl -X POST http://localhost:8000/upload/image
-F "file=@/path/to/your/image.png"
curl -X POST http://localhost:8000/upload/image
-F "file=@/path/to/your/image.png"
Ask a Question:
curl -X POST http://localhost:8000/ask
-H "Content-Type: application/json"
-d '{
"question": "What does the uploaded document say?",
"max_tokens": 256,
"temperature": 0.3
}'
curl -X POST http://localhost:8000/ask
-H "Content-Type: application/json"
-d '{
"question": "What does the uploaded document say?",
"max_tokens": 256,
"temperature": 0.3
}'
Get Result by Request ID:
curl http://localhost:8000/result/
Replace with the actual UUID returned from the PDF/image/question request.
curl http://localhost:8000/result/
Replace with the actual UUID returned from the PDF/image/question request.
Hope you find it helpful!!