import curses import os import json from datetime import datetime class HendrikTUI: C_HEADER = 1 C_USER = 2 C_AI = 3 C_SYSTEM = 4 C_INPUT = 5 C_STATUS = 6 C_SEP = 7 C_ERROR = 8 C_INPUT_BORDER = 9 def __init__(self, llm_client, tools_definition, TOOLS, TOOL_HANDLERS, build_system_prompt, agent_max_iterations): self.llm = llm_client self.tools_def = tools_definition self.TOOLS = TOOLS self.TOOL_HANDLERS = TOOL_HANDLERS self.build_system_prompt = build_system_prompt self.agent_max_iterations = agent_max_iterations self.messages = None self.log = [] self.input_buffer = [""] self.input_line = 0 self.input_col = 0 self.scroll = 0 self.processing = False self.running = True self.h, self.w = 0, 0 def run(self): try: curses.wrapper(self._main) except KeyboardInterrupt: pass print("Exiting.") def _main(self, stdscr): curses.use_default_colors() self._init_colors() stdscr.keypad(True) stdscr.refresh() self.messages = [{"role": "system", "content": self.build_system_prompt(self.tools_def)}] while self.running: self.h, self.w = stdscr.getmaxyx() if self.h < 14 or self.w < 40: stdscr.erase() stdscr.addstr(0, 0, "Terminal too small (min 40x14)") stdscr.refresh() stdscr.getch() continue self._draw(stdscr) curses.curs_set(0 if self.processing else 2) try: key = stdscr.getch() except KeyboardInterrupt: break if not self.processing: self._handle_key(stdscr, key) # ---- colors ----------------------------------------------------------- def _init_colors(self): curses.init_pair(self.C_HEADER, curses.COLOR_BLACK, curses.COLOR_BLUE) curses.init_pair(self.C_USER, curses.COLOR_CYAN, -1) curses.init_pair(self.C_AI, curses.COLOR_GREEN, -1) curses.init_pair(self.C_SYSTEM, curses.COLOR_YELLOW, -1) curses.init_pair(self.C_INPUT, curses.COLOR_WHITE, -1) curses.init_pair(self.C_STATUS, curses.COLOR_BLACK, curses.COLOR_YELLOW) curses.init_pair(self.C_SEP, curses.COLOR_MAGENTA, -1) curses.init_pair(self.C_ERROR, curses.COLOR_RED, -1) curses.init_pair(self.C_INPUT_BORDER, curses.COLOR_BLUE, -1) # ---- main draw -------------------------------------------------------- def _draw(self, stdscr): stdscr.erase() self._draw_header(stdscr) self._draw_chat(stdscr) self._draw_status(stdscr) self._draw_input(stdscr) def _draw_header(self, stdscr): name = " Hendrik AI Agent " model = f" {self.llm.model} " mid = self.w - len(model) - 1 pad = max(1, mid - len(name) - 1) line = name + "\u2500" * pad + " " + model attr = curses.color_pair(self.C_HEADER) | curses.A_BOLD stdscr.addstr(0, 0, line[:self.w], attr) # ---- chat area -------------------------------------------------------- def _draw_chat(self, stdscr): chat_top = 1 chat_h = self.h - 10 if chat_h <= 0: return rendered = [] for item in self.log: role, text = item["role"], item["text"] if role == "sep": rendered.append((None, "\u2500" * self.w)) continue label = "" color = None if role == "user": label = f" You ({item['time']}) " color = self.C_USER elif role == "ai": label = f" Hendrik ({item['time']}) " color = self.C_AI elif role == "system": label = " \u25e6 " color = self.C_SYSTEM elif role == "error": label = " \u2717 " color = self.C_ERROR lines = text.split("\n") rendered.append((color, label + (lines[0] if lines else ""))) for line in lines[1:]: rendered.append((color, " " + line)) total = len(rendered) max_scroll = max(0, total - chat_h) if self.scroll > max_scroll: self.scroll = max_scroll self.scroll = max(0, self.scroll) y = chat_top for i in range(self.scroll, min(self.scroll + chat_h, total)): color, text = rendered[i] attr = curses.color_pair(color) if color else curses.A_NORMAL if len(text) >= self.w: text = text[: self.w - 1] try: stdscr.addstr(y, 0, text, attr) except curses.error: pass y += 1 # ---- input area ------------------------------------------------------- def _draw_input(self, stdscr): iy = self.h - 8 border_attr = curses.color_pair(self.C_INPUT_BORDER) | curses.A_BOLD for off, text in [ (0, "\u250c" + "\u2500" * (self.w - 2) + "\u2510"), (7, "\u2514" + "\u2500" * (self.w - 2) + "\u2518"), ]: try: stdscr.addstr(iy + off, 0, text[:self.w], border_attr) except curses.error: pass total = len(self.input_buffer) if total <= 6: show = 0 else: show = max(0, min(self.input_line - 3, total - 6)) cursor_yx = None text_attr = curses.color_pair(self.C_INPUT) for i in range(6): idx = show + i y = iy + 1 + i line = self.input_buffer[idx] if idx < total else "" max_text = self.w - 6 if len(line) > max_text: line = line[:max_text] try: stdscr.addstr(y, 0, "\u2502 ", border_attr) except curses.error: pass if idx < total: content = "> " + line + " " * (self.w - 4 - 2 - len(line)) try: stdscr.addstr(y, 2, content, text_attr) except curses.error: pass else: try: stdscr.addstr(y, 2, " " * (self.w - 4), border_attr) except curses.error: pass try: stdscr.addstr(y, self.w - 2, " \u2502", border_attr) except curses.error: pass if idx == self.input_line and not self.processing: cursor_yx = (y, 4 + min(self.input_col, max_text)) if cursor_yx: stdscr.move(*cursor_yx) # ---- status bar ------------------------------------------------------- def _draw_status(self, stdscr): y = self.h - 9 ws = os.getcwd() mode = " PROCESSING " if self.processing else " READY " hints = " ^D:send ^W:workspace ^C:exit " max_ws = self.w - len(mode) - len(hints) - 4 if len(ws) > max_ws: ws = ".." + ws[-(max_ws - 2):] status = f" {ws} \u2502{mode}\u2502{hints}" attr = curses.color_pair(self.C_STATUS) | curses.A_BOLD stdscr.addstr(y, 0, status[:self.w], attr) # ---- input handling --------------------------------------------------- def _handle_key(self, stdscr, key): if key == 4: self._submit(stdscr) elif key == 23: self._workspace_popup(stdscr) elif key == 3: self.running = False elif key == 12: self.log.clear() elif key in (curses.KEY_ENTER, 10, 13): self.input_buffer.insert(self.input_line + 1, "") self.input_line += 1 self.input_col = 0 elif key in (curses.KEY_BACKSPACE, 127): if self.input_col > 0: line = self.input_buffer[self.input_line] self.input_buffer[self.input_line] = ( line[: self.input_col - 1] + line[self.input_col :] ) self.input_col -= 1 elif self.input_line > 0: carry = self.input_buffer.pop(self.input_line) self.input_line -= 1 self.input_col = len(self.input_buffer[self.input_line]) self.input_buffer[self.input_line] += carry elif key == curses.KEY_UP: if self.input_line > 0: self.input_line -= 1 self.input_col = min( self.input_col, len(self.input_buffer[self.input_line]) ) elif key == curses.KEY_DOWN: if self.input_line < len(self.input_buffer) - 1: self.input_line += 1 self.input_col = min( self.input_col, len(self.input_buffer[self.input_line]) ) elif key == curses.KEY_LEFT: if self.input_col > 0: self.input_col -= 1 elif self.input_line > 0: self.input_line -= 1 self.input_col = len(self.input_buffer[self.input_line]) elif key == curses.KEY_RIGHT: if self.input_col < len(self.input_buffer[self.input_line]): self.input_col += 1 elif self.input_line < len(self.input_buffer) - 1: self.input_line += 1 self.input_col = 0 elif key == curses.KEY_HOME: self.input_col = 0 elif key == curses.KEY_END: self.input_col = len(self.input_buffer[self.input_line]) elif key == curses.KEY_PPAGE: self.scroll = max(0, self.scroll - (self.h - 10)) elif key == curses.KEY_NPAGE: self.scroll += self.h - 10 elif key == curses.KEY_RESIZE: pass elif key == 9: line = self.input_buffer[self.input_line] self.input_buffer[self.input_line] = ( line[: self.input_col] + " " + line[self.input_col :] ) self.input_col += 4 elif 32 <= key <= 255: ch = chr(key) line = self.input_buffer[self.input_line] self.input_buffer[self.input_line] = ( line[: self.input_col] + ch + line[self.input_col :] ) self.input_col += 1 # ---- submit & process ------------------------------------------------- def _log(self, role, text): self.log.append({ "role": role, "text": text, "time": datetime.now().strftime("%H:%M"), }) def _submit(self, stdscr): query = "\n".join(self.input_buffer).strip() if not query: return self._log("user", query) self.input_buffer = [""] self.input_line = 0 self.input_col = 0 self.scroll = 999999 self.processing = True self._draw(stdscr) stdscr.refresh() self.messages.append({"role": "user", "content": query}) for step in range(self.agent_max_iterations): self._log("system", f" step {step + 1} \u2014 LLM...") self.scroll = 999999 self._draw(stdscr) stdscr.refresh() response = self.llm.chat(self.messages, tools=self.TOOLS) self.log.pop() if response.tool_calls: amsg = { "role": "assistant", "content": response.content, "tool_calls": response.tool_calls, } self.messages.append(amsg) if response.content and response.content.strip(): self._log("ai", response.content) self.scroll = 999999 self._draw(stdscr) stdscr.refresh() for tc in response.tool_calls: tname = tc["function"]["name"] self._log("system", f" \u2192 {tname}") self.scroll = 999999 self._draw(stdscr) stdscr.refresh() self._execute_tool(tc) else: if response.content: self.messages.append({ "role": "assistant", "content": response.content, }) self._log("ai", response.content) self._log("sep", "") self.processing = False self.scroll = 999999 self._draw(stdscr) stdscr.refresh() return self._log("error", "Max iterations reached without final answer.") self.messages.append({"role": "assistant", "content": "Max iterations reached without final answer."}) self.processing = False def _execute_tool(self, tool_call): tname = tool_call["function"]["name"] targs = json.loads(tool_call["function"]["arguments"]) handler = self.TOOL_HANDLERS.get(tname) if not handler: result = f"Tool {tname} not found" else: try: if tname == "search_code": result = handler( pattern=targs["pattern"], search_type=targs["search_type"], path=targs.get("path", "."), ) elif tname == "git_operation": result = handler(args=targs["args"]) else: result = handler(**targs) except Exception as e: result = f"Error executing tool: {str(e)}" self.messages.append({ "role": "tool", "tool_call_id": tool_call["id"], "content": str(result), }) # ---- workspace popup -------------------------------------------------- def _workspace_popup(self, stdscr): pw = min(60, self.w - 4) ph = 3 px = (self.w - pw) // 2 py = self.h // 2 - 1 win = curses.newwin(ph, pw, py, px) win.box() win.addstr(0, 2, " Workspace path: ") win.addstr(1, 2, " " * (pw - 4)) curses.echo() ws = win.getstr(1, 2, pw - 5).decode("utf-8") curses.noecho() del win ws = ws.strip() if ws: resolved = os.path.abspath(ws) if os.path.isdir(resolved): os.chdir(resolved) self._log("system", f"Workspace \u2192 {resolved}") else: self._log("error", f"Invalid directory: {resolved}") stdscr.touchwin() stdscr.refresh()