124 lines
3.7 KiB
Python
124 lines
3.7 KiB
Python
import json
|
|
import threading
|
|
from datetime import datetime
|
|
from scripts import ntro
|
|
|
|
WELCOME_ART = """\
|
|
\n\
|
|
╔══════════════════════════════════════════╗
|
|
║ ║
|
|
║ /\\_/\\ H E N D R I K ║
|
|
║ ( o.o ) AI Agent ║
|
|
║ > ^ < siap membantu! ║
|
|
║ ( ) ║
|
|
║ (___) ║
|
|
║ ║
|
|
╚══════════════════════════════════════════╝"""
|
|
|
|
|
|
def log(app, role, text):
|
|
app.log.append({
|
|
"role": role,
|
|
"text": text,
|
|
"time": datetime.now().strftime("%H:%M"),
|
|
})
|
|
|
|
|
|
def submit(app, stdscr):
|
|
query = "\n".join(app.input_buffer).strip()
|
|
if not query:
|
|
return
|
|
|
|
log(app, "user", query)
|
|
app.input_buffer = [""]
|
|
app.input_line = 0
|
|
app.input_col = 0
|
|
app.scroll = 999999
|
|
app.processing = True
|
|
|
|
app.messages.append({"role": "user", "content": query})
|
|
|
|
app.agent_done.clear()
|
|
app.agent_thread = threading.Thread(
|
|
target=_agent_loop,
|
|
args=(app,),
|
|
daemon=True,
|
|
)
|
|
app.agent_thread.start()
|
|
|
|
|
|
def _agent_loop(app):
|
|
stamp = ntro.start()
|
|
|
|
for step in range(app.agent_max_iterations):
|
|
stamp_step = ntro.start()
|
|
log(app, "system", f" step {step + 1} \u2014 Thinking...")
|
|
app.scroll = 999999
|
|
|
|
response = app.llm.chat(app.messages, tools=app.TOOLS)
|
|
|
|
app.log.pop()
|
|
|
|
if response.tool_calls:
|
|
amsg = {
|
|
"role": "assistant",
|
|
"content": response.content,
|
|
"tool_calls": response.tool_calls,
|
|
}
|
|
app.messages.append(amsg)
|
|
if response.content and response.content.strip():
|
|
log(app, "ai", response.content)
|
|
app.scroll = 999999
|
|
for tc in response.tool_calls:
|
|
tname = tc["function"]["name"]
|
|
log(app, "system", f" \u2192 {tname}")
|
|
app.scroll = 999999
|
|
execute_tool(app, tc)
|
|
else:
|
|
if response.content:
|
|
app.messages.append({
|
|
"role": "assistant",
|
|
"content": response.content,
|
|
})
|
|
log(app, "ai", response.content)
|
|
log(app, "sep", "")
|
|
ntro.end(stamp)
|
|
app.agent_done.set()
|
|
return
|
|
ntro.end(stamp_step)
|
|
|
|
log(app, "error", "Max iterations reached without final answer.")
|
|
app.messages.append({"role": "assistant",
|
|
"content": "Max iterations reached without final answer."})
|
|
ntro.end(stamp)
|
|
app.agent_done.set()
|
|
|
|
|
|
def execute_tool(app, tool_call):
|
|
tname = tool_call["function"]["name"]
|
|
targs = json.loads(tool_call["function"]["arguments"])
|
|
handler = app.TOOL_HANDLERS.get(tname)
|
|
|
|
if not handler:
|
|
result = f"Tool {tname} not found"
|
|
else:
|
|
try:
|
|
if tname == "search_code":
|
|
result = handler(
|
|
pattern=targs["pattern"],
|
|
search_type=targs["search_type"],
|
|
path=targs.get("path", "."),
|
|
)
|
|
elif tname == "git_operation":
|
|
result = handler(args=targs["args"])
|
|
else:
|
|
result = handler(**targs)
|
|
except Exception as e:
|
|
result = f"Error executing tool: {str(e)}"
|
|
|
|
app.messages.append({
|
|
"role": "tool",
|
|
"tool_call_id": tool_call["id"],
|
|
"content": str(result),
|
|
})
|