diff options
| author | blackhao <13851610112@163.com> | 2025-12-05 20:40:40 -0600 |
|---|---|---|
| committer | blackhao <13851610112@163.com> | 2025-12-05 20:40:40 -0600 |
| commit | d9868550e66fe8aaa7fff55a8e24b871ee51e3b1 (patch) | |
| tree | 147757f77def085c5649c4d930d5a51ff44a1e3d /backend/app/main.py | |
| parent | d87c364dc43ca241fadc9dccbad9ec8896c93a1e (diff) | |
init: add project files and ignore secrets
Diffstat (limited to 'backend/app/main.py')
| -rw-r--r-- | backend/app/main.py | 85 |
1 files changed, 85 insertions, 0 deletions
diff --git a/backend/app/main.py b/backend/app/main.py new file mode 100644 index 0000000..48cb89f --- /dev/null +++ b/backend/app/main.py @@ -0,0 +1,85 @@ +from fastapi import FastAPI, HTTPException +from fastapi.middleware.cors import CORSMiddleware +from fastapi.responses import StreamingResponse +from app.schemas import NodeRunRequest, NodeRunResponse, MergeStrategy, Role, Message, Context +from app.services.llm import llm_streamer +from dotenv import load_dotenv +import os + +load_dotenv() + +app = FastAPI(title="ContextFlow Backend") + +app.add_middleware( + CORSMiddleware, + allow_origins=["*"], + allow_credentials=True, + allow_methods=["*"], + allow_headers=["*"], +) + +@app.get("/") +def read_root(): + return {"message": "ContextFlow Backend is running"} + +def smart_merge_messages(messages: list[Message]) -> list[Message]: + """ + Merges messages using two steps: + 1. Deduplication by ID (to handle diamond dependencies). + 2. Merging consecutive messages from the same role. + """ + if not messages: + return [] + + # 1. Deduplicate by ID, keeping order + seen_ids = set() + deduplicated = [] + for msg in messages: + if msg.id not in seen_ids: + deduplicated.append(msg) + seen_ids.add(msg.id) + + # 2. Merge consecutive roles + if not deduplicated: + return [] + + merged = [] + current_msg = deduplicated[0].model_copy() + + for next_msg in deduplicated[1:]: + if next_msg.role == current_msg.role: + # Merge content + current_msg.content += f"\n\n{next_msg.content}" + # Keep the latest timestamp + current_msg.timestamp = next_msg.timestamp + else: + merged.append(current_msg) + current_msg = next_msg.model_copy() + + merged.append(current_msg) + return merged + +@app.post("/api/run_node_stream") +async def run_node_stream(request: NodeRunRequest): + """ + Stream the response from the LLM. + """ + # 1. Concatenate all incoming contexts first + raw_messages = [] + for ctx in request.incoming_contexts: + raw_messages.extend(ctx.messages) + + # 2. Apply Merge Strategy + final_messages = [] + if request.merge_strategy == MergeStrategy.SMART: + final_messages = smart_merge_messages(raw_messages) + else: + # RAW strategy: just keep them as is + final_messages = raw_messages + + execution_context = Context(messages=final_messages) + + return StreamingResponse( + llm_streamer(execution_context, request.user_prompt, request.config), + media_type="text/event-stream" + ) |
