From 9c7e34409eb9e99c33da23d68b47da4c55ea5668 Mon Sep 17 00:00:00 2001 From: Dita Aji Pratama Date: Tue, 12 May 2026 10:00:57 +0700 Subject: [PATCH] Add curses-based TUI mode with --tui flag --- hendrik.py | 18 ++- scripts/tui.py | 392 +++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 409 insertions(+), 1 deletion(-) create mode 100644 scripts/tui.py diff --git a/hendrik.py b/hendrik.py index 9342f78..47a49b9 100644 --- a/hendrik.py +++ b/hendrik.py @@ -143,9 +143,13 @@ def agent_loop(user_query, messages, llm_client): def main(): workspace = None query_parts = [] + tui_mode = False i = 1 while i < len(sys.argv): - if sys.argv[i] in ('-w', '--workspace') and i + 1 < len(sys.argv): + if sys.argv[i] == '--tui': + tui_mode = True + i += 1 + elif sys.argv[i] in ('-w', '--workspace') and i + 1 < len(sys.argv): workspace = sys.argv[i + 1] i += 2 else: @@ -165,6 +169,18 @@ def main(): api_key=config.LLM_API_KEY ) + if tui_mode: + from scripts.tui import HendrikTUI + HendrikTUI( + llm_client=llm_client, + tools_definition=tools_definition, + TOOLS=TOOLS, + TOOL_HANDLERS=TOOL_HANDLERS, + build_system_prompt=gadget.build_system_prompt, + agent_max_iterations=config.AGENT_MAX_ITERATIONS, + ).run() + return + messages = None if query_parts: diff --git a/scripts/tui.py b/scripts/tui.py new file mode 100644 index 0000000..c5e5e81 --- /dev/null +++ b/scripts/tui.py @@ -0,0 +1,392 @@ +import curses +import os +import json +from datetime import datetime + + +class HendrikTUI: + C_HEADER = 1 + C_USER = 2 + C_AI = 3 + C_SYSTEM = 4 + C_INPUT = 5 + C_STATUS = 6 + C_SEP = 7 + C_ERROR = 8 + + def __init__(self, llm_client, tools_definition, TOOLS, TOOL_HANDLERS, + build_system_prompt, agent_max_iterations): + self.llm = llm_client + self.tools_def = tools_definition + self.TOOLS = TOOLS + self.TOOL_HANDLERS = TOOL_HANDLERS + self.build_system_prompt = build_system_prompt + self.agent_max_iterations = agent_max_iterations + + self.messages = None + self.log = [] + self.input_buffer = [""] + self.input_line = 0 + self.input_col = 0 + self.scroll = 0 + self.processing = False + self.running = True + self.h, self.w = 0, 0 + + def run(self): + curses.wrapper(self._main) + + def _main(self, stdscr): + curses.use_default_colors() + self._init_colors() + stdscr.keypad(True) + stdscr.refresh() + + self.messages = [{"role": "system", + "content": self.build_system_prompt(self.tools_def)}] + + while self.running: + self.h, self.w = stdscr.getmaxyx() + if self.h < 8 or self.w < 40: + stdscr.erase() + stdscr.addstr(0, 0, "Terminal too small (min 40x8)") + stdscr.refresh() + stdscr.getch() + continue + + self._draw(stdscr) + curses.curs_set(0 if self.processing else 2) + key = stdscr.getch() + if not self.processing: + self._handle_key(stdscr, key) + + # ---- colors ----------------------------------------------------------- + + def _init_colors(self): + curses.init_pair(self.C_HEADER, curses.COLOR_WHITE, curses.COLOR_BLUE) + curses.init_pair(self.C_USER, curses.COLOR_CYAN, -1) + curses.init_pair(self.C_AI, curses.COLOR_GREEN, -1) + curses.init_pair(self.C_SYSTEM, curses.COLOR_YELLOW, -1) + curses.init_pair(self.C_INPUT, curses.COLOR_WHITE, -1) + curses.init_pair(self.C_STATUS, curses.COLOR_BLACK, curses.COLOR_WHITE) + curses.init_pair(self.C_SEP, curses.COLOR_MAGENTA, -1) + curses.init_pair(self.C_ERROR, curses.COLOR_RED, -1) + + # ---- main draw -------------------------------------------------------- + + def _draw(self, stdscr): + stdscr.erase() + self._draw_header(stdscr) + self._draw_chat(stdscr) + self._draw_status(stdscr) + self._draw_input(stdscr) + + def _draw_header(self, stdscr): + name = " Hendrik AI Agent " + model = f" {self.llm.model} " + mid = self.w - len(model) - 1 + pad = max(1, mid - len(name) - 1) + line = name + "\u2500" * pad + " " + model + attr = curses.color_pair(self.C_HEADER) | curses.A_BOLD + stdscr.addstr(0, 0, line[:self.w], attr) + + # ---- chat area -------------------------------------------------------- + + def _draw_chat(self, stdscr): + chat_top = 1 + chat_h = self.h - 6 + if chat_h <= 0: + return + + rendered = [] + for item in self.log: + role, text = item["role"], item["text"] + if role == "sep": + rendered.append((None, "\u2500" * self.w)) + continue + label = "" + color = None + if role == "user": + label = f" You ({item['time']}) " + color = self.C_USER + elif role == "ai": + label = f" Hendrik ({item['time']}) " + color = self.C_AI + elif role == "system": + label = " \u25e6 " + color = self.C_SYSTEM + elif role == "error": + label = " \u2717 " + color = self.C_ERROR + + lines = text.split("\n") + rendered.append((color, label + (lines[0] if lines else ""))) + for line in lines[1:]: + rendered.append((color, " " + line)) + + total = len(rendered) + max_scroll = max(0, total - chat_h) + if self.scroll > max_scroll: + self.scroll = max_scroll + self.scroll = max(0, self.scroll) + + y = chat_top + for i in range(self.scroll, min(self.scroll + chat_h, total)): + color, text = rendered[i] + attr = curses.color_pair(color) if color else curses.A_NORMAL + if len(text) >= self.w: + text = text[: self.w - 1] + try: + stdscr.addstr(y, 0, text, attr) + except curses.error: + pass + y += 1 + + # ---- input area ------------------------------------------------------- + + def _draw_input(self, stdscr): + iy = self.h - 4 + blank = " " * (self.w - 2) + + for off, text in [ + (0, "\u250c" + "\u2500" * (self.w - 2) + "\u2510"), + (3, "\u2514" + "\u2500" * (self.w - 2) + "\u2518"), + ]: + try: + stdscr.addstr(iy + off, 0, text[:self.w]) + except curses.error: + pass + + total = len(self.input_buffer) + if total <= 2: + show = 0 + elif self.input_line <= 1: + show = max(0, total - 2) + elif self.input_line >= total - 2: + show = total - 2 + else: + show = self.input_line - 1 + + for i in range(2): + idx = show + i + y = iy + 1 + i + prefix = "> " if idx < total else " " + line = self.input_buffer[idx] if idx < total else "" + vis = prefix + line + if len(vis) > self.w - 2: + vis = vis[: self.w - 2] + fmt = "\u2502 " + vis + blank[len(vis):] + "\u2502" + stdscr.addstr(y, 0, fmt, curses.color_pair(self.C_INPUT)) + if idx == self.input_line and not self.processing: + col = min(2 + self.input_col, self.w - 3) + stdscr.move(y, col) + + # ---- status bar ------------------------------------------------------- + + def _draw_status(self, stdscr): + y = self.h - 5 + ws = os.getcwd() + mode = " PROCESSING " if self.processing else " READY " + hints = " ^D:send ^W:workspace ^C:exit " + max_ws = self.w - len(mode) - len(hints) - 4 + if len(ws) > max_ws: + ws = ".." + ws[-(max_ws - 2):] + status = f" {ws} \u2502{mode}\u2502{hints}" + attr = curses.color_pair(self.C_STATUS) + stdscr.addstr(y, 0, status[:self.w], attr) + + # ---- input handling --------------------------------------------------- + + def _handle_key(self, stdscr, key): + if key == 4: + self._submit() + elif key == 23: + self._workspace_popup(stdscr) + elif key == 3: + self.running = False + elif key == 12: + self.log.clear() + elif key in (curses.KEY_ENTER, 10, 13): + self.input_buffer.insert(self.input_line + 1, "") + self.input_line += 1 + self.input_col = 0 + elif key in (curses.KEY_BACKSPACE, 127): + if self.input_col > 0: + line = self.input_buffer[self.input_line] + self.input_buffer[self.input_line] = ( + line[: self.input_col - 1] + line[self.input_col :] + ) + self.input_col -= 1 + elif self.input_line > 0: + carry = self.input_buffer.pop(self.input_line) + self.input_line -= 1 + self.input_col = len(self.input_buffer[self.input_line]) + self.input_buffer[self.input_line] += carry + elif key == curses.KEY_UP: + if self.input_line > 0: + self.input_line -= 1 + self.input_col = min( + self.input_col, len(self.input_buffer[self.input_line]) + ) + elif key == curses.KEY_DOWN: + if self.input_line < len(self.input_buffer) - 1: + self.input_line += 1 + self.input_col = min( + self.input_col, len(self.input_buffer[self.input_line]) + ) + elif key == curses.KEY_LEFT: + if self.input_col > 0: + self.input_col -= 1 + elif self.input_line > 0: + self.input_line -= 1 + self.input_col = len(self.input_buffer[self.input_line]) + elif key == curses.KEY_RIGHT: + if self.input_col < len(self.input_buffer[self.input_line]): + self.input_col += 1 + elif self.input_line < len(self.input_buffer) - 1: + self.input_line += 1 + self.input_col = 0 + elif key == curses.KEY_HOME: + self.input_col = 0 + elif key == curses.KEY_END: + self.input_col = len(self.input_buffer[self.input_line]) + elif key == curses.KEY_PPAGE: + self.scroll = max(0, self.scroll - (self.h - 6)) + elif key == curses.KEY_NPAGE: + self.scroll += self.h - 6 + elif key == curses.KEY_RESIZE: + pass + elif key == 9: + line = self.input_buffer[self.input_line] + self.input_buffer[self.input_line] = ( + line[: self.input_col] + " " + line[self.input_col :] + ) + self.input_col += 4 + elif 32 <= key <= 255: + ch = chr(key) + line = self.input_buffer[self.input_line] + self.input_buffer[self.input_line] = ( + line[: self.input_col] + ch + line[self.input_col :] + ) + self.input_col += 1 + + # ---- submit & process ------------------------------------------------- + + def _log(self, role, text): + self.log.append({ + "role": role, + "text": text, + "time": datetime.now().strftime("%H:%M"), + }) + + def _submit(self): + query = "\n".join(self.input_buffer).strip() + if not query: + return + + self._log("user", query) + self.input_buffer = [""] + self.input_line = 0 + self.input_col = 0 + self.scroll = 999999 + self.processing = True + + self.messages.append({"role": "user", "content": query}) + + for step in range(self.agent_max_iterations): + self._log("system", f" step {step + 1} \u2014 LLM...") + self.scroll = 999999 + + response = self.llm.chat(self.messages, tools=self.TOOLS) + + self.log.pop() + + if response.tool_calls: + amsg = { + "role": "assistant", + "content": response.content, + "tool_calls": response.tool_calls, + } + self.messages.append(amsg) + if response.content and response.content.strip(): + self._log("ai", response.content) + self.scroll = 999999 + for tc in response.tool_calls: + tname = tc["function"]["name"] + targs = tc["function"]["arguments"] + self._log("system", f" \u2192 {tname}") + self.scroll = 999999 + self._execute_tool(tc) + else: + self.messages.append({ + "role": "assistant", + "content": response.content, + }) + self._log("ai", response.content) + self._log("sep", "") + self.processing = False + self.scroll = 999999 + return + + self._log("error", "Max iterations reached without final answer.") + self.messages.append({"role": "assistant", + "content": "Max iterations reached without final answer."}) + self.processing = False + + def _execute_tool(self, tool_call): + tname = tool_call["function"]["name"] + targs = json.loads(tool_call["function"]["arguments"]) + handler = self.TOOL_HANDLERS.get(tname) + + if not handler: + result = f"Tool {tname} not found" + else: + try: + if tname == "search_code": + result = handler( + pattern=targs["pattern"], + search_type=targs["search_type"], + path=targs.get("path", "."), + ) + elif tname == "git_operation": + result = handler(args=targs["args"]) + else: + result = handler(**targs) + except Exception as e: + result = f"Error executing tool: {str(e)}" + + self.messages.append({ + "role": "tool", + "tool_call_id": tool_call["id"], + "content": str(result), + }) + + # ---- workspace popup -------------------------------------------------- + + def _workspace_popup(self, stdscr): + pw = min(60, self.w - 4) + ph = 3 + px = (self.w - pw) // 2 + py = self.h // 2 - 1 + + win = curses.newwin(ph, pw, py, px) + win.box() + win.addstr(0, 2, " Workspace path: ") + win.addstr(1, 2, " " * (pw - 4)) + + curses.echo() + ws = win.getstr(1, 2, pw - 5).decode("utf-8") + curses.noecho() + del win + + ws = ws.strip() + if ws: + resolved = os.path.abspath(ws) + if os.path.isdir(resolved): + os.chdir(resolved) + self._log("system", f"Workspace \u2192 {resolved}") + else: + self._log("error", f"Invalid directory: {resolved}") + + stdscr.touchwin() + stdscr.refresh()