Add curses-based TUI mode with --tui flag
This commit is contained in:
parent
25ec6fd996
commit
9c7e34409e
18
hendrik.py
18
hendrik.py
@ -143,9 +143,13 @@ def agent_loop(user_query, messages, llm_client):
|
||||
def main():
|
||||
workspace = None
|
||||
query_parts = []
|
||||
tui_mode = False
|
||||
i = 1
|
||||
while i < len(sys.argv):
|
||||
if sys.argv[i] in ('-w', '--workspace') and i + 1 < len(sys.argv):
|
||||
if sys.argv[i] == '--tui':
|
||||
tui_mode = True
|
||||
i += 1
|
||||
elif sys.argv[i] in ('-w', '--workspace') and i + 1 < len(sys.argv):
|
||||
workspace = sys.argv[i + 1]
|
||||
i += 2
|
||||
else:
|
||||
@ -165,6 +169,18 @@ def main():
|
||||
api_key=config.LLM_API_KEY
|
||||
)
|
||||
|
||||
if tui_mode:
|
||||
from scripts.tui import HendrikTUI
|
||||
HendrikTUI(
|
||||
llm_client=llm_client,
|
||||
tools_definition=tools_definition,
|
||||
TOOLS=TOOLS,
|
||||
TOOL_HANDLERS=TOOL_HANDLERS,
|
||||
build_system_prompt=gadget.build_system_prompt,
|
||||
agent_max_iterations=config.AGENT_MAX_ITERATIONS,
|
||||
).run()
|
||||
return
|
||||
|
||||
messages = None
|
||||
|
||||
if query_parts:
|
||||
|
||||
392
scripts/tui.py
Normal file
392
scripts/tui.py
Normal file
@ -0,0 +1,392 @@
|
||||
import curses
|
||||
import os
|
||||
import json
|
||||
from datetime import datetime
|
||||
|
||||
|
||||
class HendrikTUI:
|
||||
C_HEADER = 1
|
||||
C_USER = 2
|
||||
C_AI = 3
|
||||
C_SYSTEM = 4
|
||||
C_INPUT = 5
|
||||
C_STATUS = 6
|
||||
C_SEP = 7
|
||||
C_ERROR = 8
|
||||
|
||||
def __init__(self, llm_client, tools_definition, TOOLS, TOOL_HANDLERS,
|
||||
build_system_prompt, agent_max_iterations):
|
||||
self.llm = llm_client
|
||||
self.tools_def = tools_definition
|
||||
self.TOOLS = TOOLS
|
||||
self.TOOL_HANDLERS = TOOL_HANDLERS
|
||||
self.build_system_prompt = build_system_prompt
|
||||
self.agent_max_iterations = agent_max_iterations
|
||||
|
||||
self.messages = None
|
||||
self.log = []
|
||||
self.input_buffer = [""]
|
||||
self.input_line = 0
|
||||
self.input_col = 0
|
||||
self.scroll = 0
|
||||
self.processing = False
|
||||
self.running = True
|
||||
self.h, self.w = 0, 0
|
||||
|
||||
def run(self):
|
||||
curses.wrapper(self._main)
|
||||
|
||||
def _main(self, stdscr):
|
||||
curses.use_default_colors()
|
||||
self._init_colors()
|
||||
stdscr.keypad(True)
|
||||
stdscr.refresh()
|
||||
|
||||
self.messages = [{"role": "system",
|
||||
"content": self.build_system_prompt(self.tools_def)}]
|
||||
|
||||
while self.running:
|
||||
self.h, self.w = stdscr.getmaxyx()
|
||||
if self.h < 8 or self.w < 40:
|
||||
stdscr.erase()
|
||||
stdscr.addstr(0, 0, "Terminal too small (min 40x8)")
|
||||
stdscr.refresh()
|
||||
stdscr.getch()
|
||||
continue
|
||||
|
||||
self._draw(stdscr)
|
||||
curses.curs_set(0 if self.processing else 2)
|
||||
key = stdscr.getch()
|
||||
if not self.processing:
|
||||
self._handle_key(stdscr, key)
|
||||
|
||||
# ---- colors -----------------------------------------------------------
|
||||
|
||||
def _init_colors(self):
|
||||
curses.init_pair(self.C_HEADER, curses.COLOR_WHITE, curses.COLOR_BLUE)
|
||||
curses.init_pair(self.C_USER, curses.COLOR_CYAN, -1)
|
||||
curses.init_pair(self.C_AI, curses.COLOR_GREEN, -1)
|
||||
curses.init_pair(self.C_SYSTEM, curses.COLOR_YELLOW, -1)
|
||||
curses.init_pair(self.C_INPUT, curses.COLOR_WHITE, -1)
|
||||
curses.init_pair(self.C_STATUS, curses.COLOR_BLACK, curses.COLOR_WHITE)
|
||||
curses.init_pair(self.C_SEP, curses.COLOR_MAGENTA, -1)
|
||||
curses.init_pair(self.C_ERROR, curses.COLOR_RED, -1)
|
||||
|
||||
# ---- main draw --------------------------------------------------------
|
||||
|
||||
def _draw(self, stdscr):
|
||||
stdscr.erase()
|
||||
self._draw_header(stdscr)
|
||||
self._draw_chat(stdscr)
|
||||
self._draw_status(stdscr)
|
||||
self._draw_input(stdscr)
|
||||
|
||||
def _draw_header(self, stdscr):
|
||||
name = " Hendrik AI Agent "
|
||||
model = f" {self.llm.model} "
|
||||
mid = self.w - len(model) - 1
|
||||
pad = max(1, mid - len(name) - 1)
|
||||
line = name + "\u2500" * pad + " " + model
|
||||
attr = curses.color_pair(self.C_HEADER) | curses.A_BOLD
|
||||
stdscr.addstr(0, 0, line[:self.w], attr)
|
||||
|
||||
# ---- chat area --------------------------------------------------------
|
||||
|
||||
def _draw_chat(self, stdscr):
|
||||
chat_top = 1
|
||||
chat_h = self.h - 6
|
||||
if chat_h <= 0:
|
||||
return
|
||||
|
||||
rendered = []
|
||||
for item in self.log:
|
||||
role, text = item["role"], item["text"]
|
||||
if role == "sep":
|
||||
rendered.append((None, "\u2500" * self.w))
|
||||
continue
|
||||
label = ""
|
||||
color = None
|
||||
if role == "user":
|
||||
label = f" You ({item['time']}) "
|
||||
color = self.C_USER
|
||||
elif role == "ai":
|
||||
label = f" Hendrik ({item['time']}) "
|
||||
color = self.C_AI
|
||||
elif role == "system":
|
||||
label = " \u25e6 "
|
||||
color = self.C_SYSTEM
|
||||
elif role == "error":
|
||||
label = " \u2717 "
|
||||
color = self.C_ERROR
|
||||
|
||||
lines = text.split("\n")
|
||||
rendered.append((color, label + (lines[0] if lines else "")))
|
||||
for line in lines[1:]:
|
||||
rendered.append((color, " " + line))
|
||||
|
||||
total = len(rendered)
|
||||
max_scroll = max(0, total - chat_h)
|
||||
if self.scroll > max_scroll:
|
||||
self.scroll = max_scroll
|
||||
self.scroll = max(0, self.scroll)
|
||||
|
||||
y = chat_top
|
||||
for i in range(self.scroll, min(self.scroll + chat_h, total)):
|
||||
color, text = rendered[i]
|
||||
attr = curses.color_pair(color) if color else curses.A_NORMAL
|
||||
if len(text) >= self.w:
|
||||
text = text[: self.w - 1]
|
||||
try:
|
||||
stdscr.addstr(y, 0, text, attr)
|
||||
except curses.error:
|
||||
pass
|
||||
y += 1
|
||||
|
||||
# ---- input area -------------------------------------------------------
|
||||
|
||||
def _draw_input(self, stdscr):
|
||||
iy = self.h - 4
|
||||
blank = " " * (self.w - 2)
|
||||
|
||||
for off, text in [
|
||||
(0, "\u250c" + "\u2500" * (self.w - 2) + "\u2510"),
|
||||
(3, "\u2514" + "\u2500" * (self.w - 2) + "\u2518"),
|
||||
]:
|
||||
try:
|
||||
stdscr.addstr(iy + off, 0, text[:self.w])
|
||||
except curses.error:
|
||||
pass
|
||||
|
||||
total = len(self.input_buffer)
|
||||
if total <= 2:
|
||||
show = 0
|
||||
elif self.input_line <= 1:
|
||||
show = max(0, total - 2)
|
||||
elif self.input_line >= total - 2:
|
||||
show = total - 2
|
||||
else:
|
||||
show = self.input_line - 1
|
||||
|
||||
for i in range(2):
|
||||
idx = show + i
|
||||
y = iy + 1 + i
|
||||
prefix = "> " if idx < total else " "
|
||||
line = self.input_buffer[idx] if idx < total else ""
|
||||
vis = prefix + line
|
||||
if len(vis) > self.w - 2:
|
||||
vis = vis[: self.w - 2]
|
||||
fmt = "\u2502 " + vis + blank[len(vis):] + "\u2502"
|
||||
stdscr.addstr(y, 0, fmt, curses.color_pair(self.C_INPUT))
|
||||
if idx == self.input_line and not self.processing:
|
||||
col = min(2 + self.input_col, self.w - 3)
|
||||
stdscr.move(y, col)
|
||||
|
||||
# ---- status bar -------------------------------------------------------
|
||||
|
||||
def _draw_status(self, stdscr):
|
||||
y = self.h - 5
|
||||
ws = os.getcwd()
|
||||
mode = " PROCESSING " if self.processing else " READY "
|
||||
hints = " ^D:send ^W:workspace ^C:exit "
|
||||
max_ws = self.w - len(mode) - len(hints) - 4
|
||||
if len(ws) > max_ws:
|
||||
ws = ".." + ws[-(max_ws - 2):]
|
||||
status = f" {ws} \u2502{mode}\u2502{hints}"
|
||||
attr = curses.color_pair(self.C_STATUS)
|
||||
stdscr.addstr(y, 0, status[:self.w], attr)
|
||||
|
||||
# ---- input handling ---------------------------------------------------
|
||||
|
||||
def _handle_key(self, stdscr, key):
|
||||
if key == 4:
|
||||
self._submit()
|
||||
elif key == 23:
|
||||
self._workspace_popup(stdscr)
|
||||
elif key == 3:
|
||||
self.running = False
|
||||
elif key == 12:
|
||||
self.log.clear()
|
||||
elif key in (curses.KEY_ENTER, 10, 13):
|
||||
self.input_buffer.insert(self.input_line + 1, "")
|
||||
self.input_line += 1
|
||||
self.input_col = 0
|
||||
elif key in (curses.KEY_BACKSPACE, 127):
|
||||
if self.input_col > 0:
|
||||
line = self.input_buffer[self.input_line]
|
||||
self.input_buffer[self.input_line] = (
|
||||
line[: self.input_col - 1] + line[self.input_col :]
|
||||
)
|
||||
self.input_col -= 1
|
||||
elif self.input_line > 0:
|
||||
carry = self.input_buffer.pop(self.input_line)
|
||||
self.input_line -= 1
|
||||
self.input_col = len(self.input_buffer[self.input_line])
|
||||
self.input_buffer[self.input_line] += carry
|
||||
elif key == curses.KEY_UP:
|
||||
if self.input_line > 0:
|
||||
self.input_line -= 1
|
||||
self.input_col = min(
|
||||
self.input_col, len(self.input_buffer[self.input_line])
|
||||
)
|
||||
elif key == curses.KEY_DOWN:
|
||||
if self.input_line < len(self.input_buffer) - 1:
|
||||
self.input_line += 1
|
||||
self.input_col = min(
|
||||
self.input_col, len(self.input_buffer[self.input_line])
|
||||
)
|
||||
elif key == curses.KEY_LEFT:
|
||||
if self.input_col > 0:
|
||||
self.input_col -= 1
|
||||
elif self.input_line > 0:
|
||||
self.input_line -= 1
|
||||
self.input_col = len(self.input_buffer[self.input_line])
|
||||
elif key == curses.KEY_RIGHT:
|
||||
if self.input_col < len(self.input_buffer[self.input_line]):
|
||||
self.input_col += 1
|
||||
elif self.input_line < len(self.input_buffer) - 1:
|
||||
self.input_line += 1
|
||||
self.input_col = 0
|
||||
elif key == curses.KEY_HOME:
|
||||
self.input_col = 0
|
||||
elif key == curses.KEY_END:
|
||||
self.input_col = len(self.input_buffer[self.input_line])
|
||||
elif key == curses.KEY_PPAGE:
|
||||
self.scroll = max(0, self.scroll - (self.h - 6))
|
||||
elif key == curses.KEY_NPAGE:
|
||||
self.scroll += self.h - 6
|
||||
elif key == curses.KEY_RESIZE:
|
||||
pass
|
||||
elif key == 9:
|
||||
line = self.input_buffer[self.input_line]
|
||||
self.input_buffer[self.input_line] = (
|
||||
line[: self.input_col] + " " + line[self.input_col :]
|
||||
)
|
||||
self.input_col += 4
|
||||
elif 32 <= key <= 255:
|
||||
ch = chr(key)
|
||||
line = self.input_buffer[self.input_line]
|
||||
self.input_buffer[self.input_line] = (
|
||||
line[: self.input_col] + ch + line[self.input_col :]
|
||||
)
|
||||
self.input_col += 1
|
||||
|
||||
# ---- submit & process -------------------------------------------------
|
||||
|
||||
def _log(self, role, text):
|
||||
self.log.append({
|
||||
"role": role,
|
||||
"text": text,
|
||||
"time": datetime.now().strftime("%H:%M"),
|
||||
})
|
||||
|
||||
def _submit(self):
|
||||
query = "\n".join(self.input_buffer).strip()
|
||||
if not query:
|
||||
return
|
||||
|
||||
self._log("user", query)
|
||||
self.input_buffer = [""]
|
||||
self.input_line = 0
|
||||
self.input_col = 0
|
||||
self.scroll = 999999
|
||||
self.processing = True
|
||||
|
||||
self.messages.append({"role": "user", "content": query})
|
||||
|
||||
for step in range(self.agent_max_iterations):
|
||||
self._log("system", f" step {step + 1} \u2014 LLM...")
|
||||
self.scroll = 999999
|
||||
|
||||
response = self.llm.chat(self.messages, tools=self.TOOLS)
|
||||
|
||||
self.log.pop()
|
||||
|
||||
if response.tool_calls:
|
||||
amsg = {
|
||||
"role": "assistant",
|
||||
"content": response.content,
|
||||
"tool_calls": response.tool_calls,
|
||||
}
|
||||
self.messages.append(amsg)
|
||||
if response.content and response.content.strip():
|
||||
self._log("ai", response.content)
|
||||
self.scroll = 999999
|
||||
for tc in response.tool_calls:
|
||||
tname = tc["function"]["name"]
|
||||
targs = tc["function"]["arguments"]
|
||||
self._log("system", f" \u2192 {tname}")
|
||||
self.scroll = 999999
|
||||
self._execute_tool(tc)
|
||||
else:
|
||||
self.messages.append({
|
||||
"role": "assistant",
|
||||
"content": response.content,
|
||||
})
|
||||
self._log("ai", response.content)
|
||||
self._log("sep", "")
|
||||
self.processing = False
|
||||
self.scroll = 999999
|
||||
return
|
||||
|
||||
self._log("error", "Max iterations reached without final answer.")
|
||||
self.messages.append({"role": "assistant",
|
||||
"content": "Max iterations reached without final answer."})
|
||||
self.processing = False
|
||||
|
||||
def _execute_tool(self, tool_call):
|
||||
tname = tool_call["function"]["name"]
|
||||
targs = json.loads(tool_call["function"]["arguments"])
|
||||
handler = self.TOOL_HANDLERS.get(tname)
|
||||
|
||||
if not handler:
|
||||
result = f"Tool {tname} not found"
|
||||
else:
|
||||
try:
|
||||
if tname == "search_code":
|
||||
result = handler(
|
||||
pattern=targs["pattern"],
|
||||
search_type=targs["search_type"],
|
||||
path=targs.get("path", "."),
|
||||
)
|
||||
elif tname == "git_operation":
|
||||
result = handler(args=targs["args"])
|
||||
else:
|
||||
result = handler(**targs)
|
||||
except Exception as e:
|
||||
result = f"Error executing tool: {str(e)}"
|
||||
|
||||
self.messages.append({
|
||||
"role": "tool",
|
||||
"tool_call_id": tool_call["id"],
|
||||
"content": str(result),
|
||||
})
|
||||
|
||||
# ---- workspace popup --------------------------------------------------
|
||||
|
||||
def _workspace_popup(self, stdscr):
|
||||
pw = min(60, self.w - 4)
|
||||
ph = 3
|
||||
px = (self.w - pw) // 2
|
||||
py = self.h // 2 - 1
|
||||
|
||||
win = curses.newwin(ph, pw, py, px)
|
||||
win.box()
|
||||
win.addstr(0, 2, " Workspace path: ")
|
||||
win.addstr(1, 2, " " * (pw - 4))
|
||||
|
||||
curses.echo()
|
||||
ws = win.getstr(1, 2, pw - 5).decode("utf-8")
|
||||
curses.noecho()
|
||||
del win
|
||||
|
||||
ws = ws.strip()
|
||||
if ws:
|
||||
resolved = os.path.abspath(ws)
|
||||
if os.path.isdir(resolved):
|
||||
os.chdir(resolved)
|
||||
self._log("system", f"Workspace \u2192 {resolved}")
|
||||
else:
|
||||
self._log("error", f"Invalid directory: {resolved}")
|
||||
|
||||
stdscr.touchwin()
|
||||
stdscr.refresh()
|
||||
Loading…
Reference in New Issue
Block a user