Refactor: restructure project into TUI modules

This commit is contained in:
Dita Aji Pratama 2026-05-19 10:35:14 +07:00
parent b819e871a4
commit de89ad2706
10 changed files with 447 additions and 640 deletions

View File

@ -4,10 +4,10 @@ from dotenv import load_dotenv
load_dotenv()
# LLM Configuration
LLM_BASE_URL = os.getenv("LLM_BASE_URL", default="http://localhost:11434/v1")
LLM_MODEL = os.getenv("LLM_MODEL", default="granite4.1:8b")
LLM_API_KEY = os.getenv("LLM_API_KEY", default="ollama")
LLM_TIMEOUT = int(os.getenv("LLM_TIMEOUT", default="600"))
llm_baseurl = os.getenv("LLM_BASE_URL", default="http://localhost:11434/v1" )
llm_model = os.getenv("LLM_MODEL", default="granite4.1:8b" )
llm_api_key = os.getenv("LLM_API_KEY", default="ollama" )
llm_timeout = int( os.getenv("LLM_TIMEOUT", default="600" ) )
# Agent Configuration
AGENT_MAX_ITERATIONS = int( os.getenv("AGENT_MAX_ITERATIONS", default="10" ) )
# Tool Configuration (for future use)

View File

@ -1,161 +1,39 @@
import os, sys, json, tty, termios
import os, sys
import config
from llm_client import LLMClient
from scripts.llm_client import LLMClient
from tools import coder
from scripts import gadget
from scripts.ansicolor import TEXT_COLOR_YELLOW, TEXT_COLOR_GREEN, TEXT_COLOR_RESET
from scripts.separator import separator
from tui import HendrikTUI
# Daftar tools yang tersedia
tools_definition = [
gadget.tools_mapping( coder.schema_read_file, coder.read_file ),
gadget.tools_mapping( coder.schema_write_file, coder.write_file ),
gadget.tools_mapping( coder.schema_edit_file, coder.edit_file ),
gadget.tools_mapping( coder.schema_run_bash, coder.run_bash ),
gadget.tools_mapping( coder.schema_search_code, coder.search_code ),
gadget.tools_mapping( coder.schema_git_operation, coder.git_operation ),
gadget.tools_mapping( schema = coder.schema_read_file, handler = coder.read_file ),
gadget.tools_mapping( schema = coder.schema_write_file, handler = coder.write_file ),
gadget.tools_mapping( schema = coder.schema_edit_file, handler = coder.edit_file ),
gadget.tools_mapping( schema = coder.schema_run_bash, handler = coder.run_bash ),
gadget.tools_mapping( schema = coder.schema_search_code, handler = coder.search_code ),
gadget.tools_mapping( schema = coder.schema_git_operation, handler = coder.git_operation ),
]
# Ekstrak dari tools_definition ke dua format berbeda
TOOLS = gadget.tool_schemas (tools_definition)
TOOL_HANDLERS = gadget.tool_handlers (tools_definition)
def interactive_input(header_mode='full'):
fd = sys.stdin.fileno()
old = termios.tcgetattr(fd)
print()
if header_mode == 'full':
print(separator())
print("Hendrik AI Agent - Interactive Mode")
print(separator())
print(f"Workspace: {os.getcwd()}")
print(separator())
print("[Ctrl+W] Change workspace | :workspace <dir> | [Ctrl+D] Submit")
print(separator())
elif header_mode == 'workspace':
print(separator())
print(f"Workspace: {os.getcwd()}")
print(separator())
else:
print("[Ctrl+W] Change workspace | :workspace <dir> | [Ctrl+D] Submit")
print(separator())
buffer = bytearray()
try:
tty.setraw(fd)
while True:
ch = os.read(fd, 1)
if ch == b'\x03': # Ctrl+C → exit
termios.tcsetattr(fd, termios.TCSADRAIN, old)
print("\r\nExiting.")
sys.exit(0)
elif ch == b'\x04': # Ctrl+D → submit
break
elif ch == b'\x17': # Ctrl+W → change workspace
termios.tcsetattr(fd, termios.TCSADRAIN, old)
print("\r\n", end="")
ws = input("Workspace directory: ").strip()
if ws:
resolved = os.path.abspath(ws)
if not os.path.isdir(resolved):
print(f"Error: '{resolved}' is not a valid directory")
else:
os.chdir(resolved)
print(f"\u2192 Workspace changed to {os.getcwd()}")
return interactive_input(header_mode='workspace')
elif ch in (b'\r', b'\n'): # Enter
buffer.extend(b'\n')
sys.stdout.buffer.write(b'\r\n')
sys.stdout.flush()
elif ch == b'\x7f': # Backspace
if buffer:
buffer.pop()
sys.stdout.buffer.write(b'\b \b')
sys.stdout.flush()
elif ch >= b' ': # Printable characters
buffer.extend(ch)
sys.stdout.buffer.write(ch)
sys.stdout.flush()
finally:
termios.tcsetattr(fd, termios.TCSADRAIN, old)
full_query = buffer.decode('utf-8', errors='replace').strip()
if full_query.startswith(':workspace '):
ws = full_query[11:].strip()
resolved = os.path.abspath(ws)
if not os.path.isdir(resolved):
print(f"Error: '{resolved}' is not a valid directory")
return interactive_input(header_mode=header_mode)
os.chdir(resolved)
print(f"\u2192 Workspace changed to {os.getcwd()}")
return interactive_input(header_mode='workspace')
return full_query
def agent_loop(user_query, messages, llm_client):
messages.append({"role": "user", "content": user_query})
for _ in range(config.AGENT_MAX_ITERATIONS):
response = llm_client.chat(messages, tools=TOOLS)
if response.tool_calls:
assistant_msg = {
"role": "assistant",
"content": response.content,
"tool_calls": response.tool_calls
}
messages.append(assistant_msg)
for tool_call in response.tool_calls:
tool_name = tool_call['function']['name']
tool_args = json.loads(tool_call['function']['arguments'])
handler = TOOL_HANDLERS.get(tool_name)
if not handler:
result = f"Tool {tool_name} not found"
else:
args_display = ", ".join(f"{k}={v!r}" for k, v in tool_args.items())
print(f" \u2192 {tool_name}({args_display})")
try:
if tool_name == "search_code":
result = handler(
pattern=tool_args["pattern"],
search_type=tool_args["search_type"],
path=tool_args.get("path", ".")
)
elif tool_name == "git_operation":
result = handler(args=tool_args["args"])
else:
result = handler(**tool_args)
except Exception as e:
result = f"Error executing tool: {str(e)}"
messages.append({
"role": "tool",
"tool_call_id": tool_call['id'],
"content": str(result)
})
else:
messages.append({"role": "assistant", "content": response.content})
return response.content, messages
msg = "Max iterations reached without final answer."
messages.append({"role": "assistant", "content": msg})
return msg, messages
def main():
llm_client = LLMClient(config.llm_baseurl, config.llm_model, config.llm_api_key, config.llm_timeout)
# Parsing arguments `-w <dir>` atau `--workspace <dir>`
workspace = None
query_parts = []
tui_mode = False
i = 1
while i < len(sys.argv):
if sys.argv[i] == '--tui':
tui_mode = True
i += 1
elif sys.argv[i] in ('-w', '--workspace') and i + 1 < len(sys.argv):
if sys.argv[i] in ('-w', '--workspace') and i + 1 < len(sys.argv):
workspace = sys.argv[i + 1]
i += 2
else:
query_parts.append(sys.argv[i])
i += 1
# Apply workspace jika ada
if workspace:
resolved = os.path.abspath(workspace)
if not os.path.isdir(resolved):
@ -163,14 +41,6 @@ def main():
sys.exit(1)
os.chdir(resolved)
llm_client = LLMClient(
base_url=config.LLM_BASE_URL,
model=config.LLM_MODEL,
api_key=config.LLM_API_KEY
)
if tui_mode:
from scripts.tui import HendrikTUI
HendrikTUI(
llm_client = llm_client,
tools_definition = tools_definition,
@ -178,42 +48,7 @@ def main():
TOOL_HANDLERS = TOOL_HANDLERS,
build_system_prompt = gadget.build_system_prompt,
agent_max_iterations = config.AGENT_MAX_ITERATIONS,
).run()
return
messages = None
if query_parts:
user_query = ' '.join(query_parts)
if not user_query:
print("No query provided.")
return
messages = [{"role": "system", "content": gadget.build_system_prompt(tools_definition)}]
print(f"\n{TEXT_COLOR_YELLOW}Thinking...{TEXT_COLOR_RESET}")
final_answer, messages = agent_loop(user_query, messages, llm_client)
print(f"\n{TEXT_COLOR_GREEN}Final Answer:{TEXT_COLOR_RESET}")
print(final_answer)
return
first_interaction = True
while True:
user_query = interactive_input(
header_mode='full' if first_interaction else 'compact'
)
first_interaction = False
if not user_query:
break
if user_query.lower() in ('/exit', '/quit'):
break
if messages is None:
messages = [{"role": "system", "content": gadget.build_system_prompt(tools_definition)}]
print(f"{TEXT_COLOR_YELLOW}Thinking...{TEXT_COLOR_RESET}")
final_answer, messages = agent_loop(user_query, messages, llm_client)
print(f"\n{TEXT_COLOR_GREEN}Final Answer:{TEXT_COLOR_RESET}")
print(final_answer)
).run() # Luncurkan TUI
if __name__ == "__main__":
main()

View File

@ -1,3 +0,0 @@
TEXT_COLOR_YELLOW = '\033[93m'
TEXT_COLOR_GREEN = '\033[92m'
TEXT_COLOR_RESET = '\033[0m'

View File

@ -1,7 +1,6 @@
import json
import urllib.request
import urllib.error
from config import LLM_BASE_URL, LLM_MODEL, LLM_API_KEY, LLM_TIMEOUT
class LLMClient:
class Message:
@ -9,10 +8,11 @@ class LLMClient:
self.content = msg.get('content', '')
self.tool_calls = msg.get('tool_calls', None)
def __init__(self, base_url=LLM_BASE_URL, model=LLM_MODEL, api_key=LLM_API_KEY):
def __init__(self, base_url, model, api_key, timeout=600):
self.base_url = base_url.rstrip('/')
self.model = model
self.api_key = api_key
self.timeout = timeout
def chat(self, messages, tools=None):
url = f"{self.base_url}/chat/completions"
@ -30,7 +30,7 @@ class LLMClient:
req.add_header('Authorization', f'Bearer {self.api_key}')
try:
with urllib.request.urlopen(req, timeout=LLM_TIMEOUT) as resp:
with urllib.request.urlopen(req, timeout=self.timeout) as resp:
response = json.loads(resp.read().decode('utf-8'))
message = response['choices'][0]['message']
return self.Message(message)

View File

@ -1,5 +0,0 @@
import shutil
def separator():
return "\u2500" * shutil.get_terminal_size().columns

View File

@ -1,431 +0,0 @@
import curses
import os
import json
from datetime import datetime
class HendrikTUI:
C_HEADER = 1
C_USER = 2
C_AI = 3
C_SYSTEM = 4
C_INPUT = 5
C_STATUS = 6
C_SEP = 7
C_ERROR = 8
C_INPUT_BORDER = 9
def __init__(self, llm_client, tools_definition, TOOLS, TOOL_HANDLERS,
build_system_prompt, agent_max_iterations):
self.llm = llm_client
self.tools_def = tools_definition
self.TOOLS = TOOLS
self.TOOL_HANDLERS = TOOL_HANDLERS
self.build_system_prompt = build_system_prompt
self.agent_max_iterations = agent_max_iterations
self.messages = None
self.log = []
self.input_buffer = [""]
self.input_line = 0
self.input_col = 0
self.scroll = 0
self.processing = False
self.running = True
self.h, self.w = 0, 0
def run(self):
try:
curses.wrapper(self._main)
except KeyboardInterrupt:
pass
print("Exiting.")
def _main(self, stdscr):
curses.use_default_colors()
self._init_colors()
stdscr.keypad(True)
stdscr.refresh()
self.messages = [{"role": "system",
"content": self.build_system_prompt(self.tools_def)}]
while self.running:
self.h, self.w = stdscr.getmaxyx()
if self.h < 14 or self.w < 40:
stdscr.erase()
stdscr.addstr(0, 0, "Terminal too small (min 40x14)")
stdscr.refresh()
stdscr.getch()
continue
self._draw(stdscr)
curses.curs_set(0 if self.processing else 2)
try:
key = stdscr.getch()
except KeyboardInterrupt:
break
if not self.processing:
self._handle_key(stdscr, key)
# ---- colors -----------------------------------------------------------
def _init_colors(self):
curses.init_pair(self.C_HEADER, curses.COLOR_BLACK, curses.COLOR_BLUE)
curses.init_pair(self.C_USER, curses.COLOR_CYAN, -1)
curses.init_pair(self.C_AI, curses.COLOR_GREEN, -1)
curses.init_pair(self.C_SYSTEM, curses.COLOR_YELLOW, -1)
curses.init_pair(self.C_INPUT, curses.COLOR_WHITE, -1)
curses.init_pair(self.C_STATUS, curses.COLOR_BLACK, curses.COLOR_YELLOW)
curses.init_pair(self.C_SEP, curses.COLOR_MAGENTA, -1)
curses.init_pair(self.C_ERROR, curses.COLOR_RED, -1)
curses.init_pair(self.C_INPUT_BORDER, curses.COLOR_BLUE, -1)
# ---- main draw --------------------------------------------------------
def _draw(self, stdscr):
stdscr.erase()
self._draw_header(stdscr)
self._draw_chat(stdscr)
self._draw_status(stdscr)
self._draw_input(stdscr)
def _draw_header(self, stdscr):
name = " Hendrik AI Agent "
model = f" {self.llm.model} "
mid = self.w - len(model) - 1
pad = max(1, mid - len(name) - 1)
line = name + "\u2500" * pad + " " + model
attr = curses.color_pair(self.C_HEADER) | curses.A_BOLD
stdscr.addstr(0, 0, line[:self.w], attr)
# ---- chat area --------------------------------------------------------
def _draw_chat(self, stdscr):
chat_top = 1
chat_h = self.h - 10
if chat_h <= 0:
return
rendered = []
for item in self.log:
role, text = item["role"], item["text"]
if role == "sep":
rendered.append((None, "\u2500" * self.w))
continue
label = ""
color = None
if role == "user":
label = f" You ({item['time']}) "
color = self.C_USER
elif role == "ai":
label = f" Hendrik ({item['time']}) "
color = self.C_AI
elif role == "system":
label = " \u25e6 "
color = self.C_SYSTEM
elif role == "error":
label = " \u2717 "
color = self.C_ERROR
lines = text.split("\n")
rendered.append((color, label + (lines[0] if lines else "")))
for line in lines[1:]:
rendered.append((color, " " + line))
total = len(rendered)
max_scroll = max(0, total - chat_h)
if self.scroll > max_scroll:
self.scroll = max_scroll
self.scroll = max(0, self.scroll)
y = chat_top
for i in range(self.scroll, min(self.scroll + chat_h, total)):
color, text = rendered[i]
attr = curses.color_pair(color) if color else curses.A_NORMAL
if len(text) >= self.w:
text = text[: self.w - 1]
try:
stdscr.addstr(y, 0, text, attr)
except curses.error:
pass
y += 1
# ---- input area -------------------------------------------------------
def _draw_input(self, stdscr):
iy = self.h - 8
border_attr = curses.color_pair(self.C_INPUT_BORDER) | curses.A_BOLD
for off, text in [
(0, "\u250c" + "\u2500" * (self.w - 2) + "\u2510"),
(7, "\u2514" + "\u2500" * (self.w - 2) + "\u2518"),
]:
try:
stdscr.addstr(iy + off, 0, text[:self.w], border_attr)
except curses.error:
pass
total = len(self.input_buffer)
if total <= 6:
show = 0
else:
show = max(0, min(self.input_line - 3, total - 6))
cursor_yx = None
text_attr = curses.color_pair(self.C_INPUT)
for i in range(6):
idx = show + i
y = iy + 1 + i
line = self.input_buffer[idx] if idx < total else ""
max_text = self.w - 6
if len(line) > max_text:
line = line[:max_text]
try:
stdscr.addstr(y, 0, "\u2502 ", border_attr)
except curses.error:
pass
if idx < total:
content = "> " + line + " " * (self.w - 4 - 2 - len(line))
try:
stdscr.addstr(y, 2, content, text_attr)
except curses.error:
pass
else:
try:
stdscr.addstr(y, 2, " " * (self.w - 4), border_attr)
except curses.error:
pass
try:
stdscr.addstr(y, self.w - 2, " \u2502", border_attr)
except curses.error:
pass
if idx == self.input_line and not self.processing:
cursor_yx = (y, 4 + min(self.input_col, max_text))
if cursor_yx:
stdscr.move(*cursor_yx)
# ---- status bar -------------------------------------------------------
def _draw_status(self, stdscr):
y = self.h - 9
ws = os.getcwd()
mode = " PROCESSING " if self.processing else " READY "
hints = " ^D:send ^W:workspace ^C:exit "
max_ws = self.w - len(mode) - len(hints) - 4
if len(ws) > max_ws:
ws = ".." + ws[-(max_ws - 2):]
status = f" {ws} \u2502{mode}\u2502{hints}"
attr = curses.color_pair(self.C_STATUS) | curses.A_BOLD
stdscr.addstr(y, 0, status[:self.w], attr)
# ---- input handling ---------------------------------------------------
def _handle_key(self, stdscr, key):
if key == 4:
self._submit(stdscr)
elif key == 23:
self._workspace_popup(stdscr)
elif key == 3:
self.running = False
elif key == 12:
self.log.clear()
elif key in (curses.KEY_ENTER, 10, 13):
self.input_buffer.insert(self.input_line + 1, "")
self.input_line += 1
self.input_col = 0
elif key in (curses.KEY_BACKSPACE, 127):
if self.input_col > 0:
line = self.input_buffer[self.input_line]
self.input_buffer[self.input_line] = (
line[: self.input_col - 1] + line[self.input_col :]
)
self.input_col -= 1
elif self.input_line > 0:
carry = self.input_buffer.pop(self.input_line)
self.input_line -= 1
self.input_col = len(self.input_buffer[self.input_line])
self.input_buffer[self.input_line] += carry
elif key == curses.KEY_UP:
if self.input_line > 0:
self.input_line -= 1
self.input_col = min(
self.input_col, len(self.input_buffer[self.input_line])
)
elif key == curses.KEY_DOWN:
if self.input_line < len(self.input_buffer) - 1:
self.input_line += 1
self.input_col = min(
self.input_col, len(self.input_buffer[self.input_line])
)
elif key == curses.KEY_LEFT:
if self.input_col > 0:
self.input_col -= 1
elif self.input_line > 0:
self.input_line -= 1
self.input_col = len(self.input_buffer[self.input_line])
elif key == curses.KEY_RIGHT:
if self.input_col < len(self.input_buffer[self.input_line]):
self.input_col += 1
elif self.input_line < len(self.input_buffer) - 1:
self.input_line += 1
self.input_col = 0
elif key == curses.KEY_HOME:
self.input_col = 0
elif key == curses.KEY_END:
self.input_col = len(self.input_buffer[self.input_line])
elif key == curses.KEY_PPAGE:
self.scroll = max(0, self.scroll - (self.h - 10))
elif key == curses.KEY_NPAGE:
self.scroll += self.h - 10
elif key == curses.KEY_RESIZE:
pass
elif key == 9:
line = self.input_buffer[self.input_line]
self.input_buffer[self.input_line] = (
line[: self.input_col] + " " + line[self.input_col :]
)
self.input_col += 4
elif 32 <= key <= 255:
ch = chr(key)
line = self.input_buffer[self.input_line]
self.input_buffer[self.input_line] = (
line[: self.input_col] + ch + line[self.input_col :]
)
self.input_col += 1
# ---- submit & process -------------------------------------------------
def _log(self, role, text):
self.log.append({
"role": role,
"text": text,
"time": datetime.now().strftime("%H:%M"),
})
def _submit(self, stdscr):
query = "\n".join(self.input_buffer).strip()
if not query:
return
self._log("user", query)
self.input_buffer = [""]
self.input_line = 0
self.input_col = 0
self.scroll = 999999
self.processing = True
self._draw(stdscr)
stdscr.refresh()
self.messages.append({"role": "user", "content": query})
for step in range(self.agent_max_iterations):
self._log("system", f" step {step + 1} \u2014 LLM...")
self.scroll = 999999
self._draw(stdscr)
stdscr.refresh()
response = self.llm.chat(self.messages, tools=self.TOOLS)
self.log.pop()
if response.tool_calls:
amsg = {
"role": "assistant",
"content": response.content,
"tool_calls": response.tool_calls,
}
self.messages.append(amsg)
if response.content and response.content.strip():
self._log("ai", response.content)
self.scroll = 999999
self._draw(stdscr)
stdscr.refresh()
for tc in response.tool_calls:
tname = tc["function"]["name"]
self._log("system", f" \u2192 {tname}")
self.scroll = 999999
self._draw(stdscr)
stdscr.refresh()
self._execute_tool(tc)
else:
if response.content:
self.messages.append({
"role": "assistant",
"content": response.content,
})
self._log("ai", response.content)
self._log("sep", "")
self.processing = False
self.scroll = 999999
self._draw(stdscr)
stdscr.refresh()
return
self._log("error", "Max iterations reached without final answer.")
self.messages.append({"role": "assistant",
"content": "Max iterations reached without final answer."})
self.processing = False
def _execute_tool(self, tool_call):
tname = tool_call["function"]["name"]
targs = json.loads(tool_call["function"]["arguments"])
handler = self.TOOL_HANDLERS.get(tname)
if not handler:
result = f"Tool {tname} not found"
else:
try:
if tname == "search_code":
result = handler(
pattern=targs["pattern"],
search_type=targs["search_type"],
path=targs.get("path", "."),
)
elif tname == "git_operation":
result = handler(args=targs["args"])
else:
result = handler(**targs)
except Exception as e:
result = f"Error executing tool: {str(e)}"
self.messages.append({
"role": "tool",
"tool_call_id": tool_call["id"],
"content": str(result),
})
# ---- workspace popup --------------------------------------------------
def _workspace_popup(self, stdscr):
pw = min(60, self.w - 4)
ph = 3
px = (self.w - pw) // 2
py = self.h // 2 - 1
win = curses.newwin(ph, pw, py, px)
win.box()
win.addstr(0, 2, " Workspace path: ")
win.addstr(1, 2, " " * (pw - 4))
curses.echo()
ws = win.getstr(1, 2, pw - 5).decode("utf-8")
curses.noecho()
del win
ws = ws.strip()
if ws:
resolved = os.path.abspath(ws)
if os.path.isdir(resolved):
os.chdir(resolved)
self._log("system", f"Workspace \u2192 {resolved}")
else:
self._log("error", f"Invalid directory: {resolved}")
stdscr.touchwin()
stdscr.refresh()

2
tui/__init__.py Normal file
View File

@ -0,0 +1,2 @@
# Re-export HendrikTUI agar import dari luar cukup: from tui import HendrikTUI
from .app import HendrikTUI

69
tui/app.py Normal file
View File

@ -0,0 +1,69 @@
import curses
from .render import init_colors, draw
from .input import handle_key
class HendrikTUI:
# State holder & orchestrator.
# Semua data UI disimpan di sini, method lain (render, input, agent)
# menerima `app` (self) sebagai parameter pertama.
def __init__(self, llm_client, tools_definition, TOOLS, TOOL_HANDLERS,
build_system_prompt, agent_max_iterations):
# -- dependencies (disuntik dari luar) --
self.llm = llm_client
self.tools_def = tools_definition
self.TOOLS = TOOLS
self.TOOL_HANDLERS = TOOL_HANDLERS
self.build_system_prompt = build_system_prompt
self.agent_max_iterations = agent_max_iterations
# -- UI state --
self.messages = None # chat history yg dikirim ke LLM API
self.log = [] # rendered log (display-only, ada timestamp)
self.input_buffer = [""] # baris-baris input multi-line
self.input_line = 0 # index baris aktif di buffer
self.input_col = 0 # kolom kursor di baris aktif
self.scroll = 0 # scroll offset chat area
self.processing = False # true saat agent sedang memproses
self.running = True # false → keluar dari main loop
self.h, self.w = 0, 0 # ukuran terminal (height, width)
def run(self):
# Masuk ke curses wrapper. wrapper() setup/teardown terminal
# dan menangani restore terminal meskipun terjadi error.
try:
curses.wrapper(self._main)
except KeyboardInterrupt:
pass
print("Exiting.")
def _main(self, stdscr):
# Main loop: draw → getch → handle
curses.use_default_colors()
init_colors()
stdscr.keypad(True) # enable key codes (KEY_UP, KEY_DOWN, dll)
stdscr.refresh()
# Init system prompt — sekali di awal
self.messages = [{"role": "system",
"content": self.build_system_prompt(self.tools_def)}]
while self.running:
self.h, self.w = stdscr.getmaxyx()
# Minimal ukuran terminal biar UI gak rusak
if self.h < 14 or self.w < 40:
stdscr.erase()
stdscr.addstr(0, 0, "Terminal too small (min 40x14)")
stdscr.refresh()
stdscr.getch()
continue
draw(self, stdscr)
# Sembunyikan kursor saat processing, tampilkan high visibility saat idle
curses.curs_set(0 if self.processing else 2)
try:
key = stdscr.getch()
except KeyboardInterrupt:
break
if not self.processing:
handle_key(self, stdscr, key)

126
tui/input.py Normal file
View File

@ -0,0 +1,126 @@
# input.py — Keyboard handling dan workspace popup.
# handle_key() adalah dispatch besar yang menerjemahkan
# key code curses menjadi aksi pada state app.
import curses
import os
from .agent import submit, log
def handle_key(app, stdscr, key):
# -- Ctrl shortcuts --
if key == 4: # Ctrl+D → submit query ke LLM
submit(app, stdscr)
elif key == 23: # Ctrl+W → popup ganti workspace
workspace_popup(app, stdscr)
elif key == 3: # Ctrl+C → exit
app.running = False
elif key == 12: # Ctrl+L → clear chat log
app.log.clear()
# -- Enter: buat baris baru di input buffer --
elif key in (curses.KEY_ENTER, 10, 13):
app.input_buffer.insert(app.input_line + 1, "")
app.input_line += 1
app.input_col = 0
# -- Backspace: hapus karakter sebelumnya atau gabung baris --
elif key in (curses.KEY_BACKSPACE, 127):
if app.input_col > 0:
line = app.input_buffer[app.input_line]
app.input_buffer[app.input_line] = (
line[: app.input_col - 1] + line[app.input_col :]
)
app.input_col -= 1
elif app.input_line > 0:
carry = app.input_buffer.pop(app.input_line)
app.input_line -= 1
app.input_col = len(app.input_buffer[app.input_line])
app.input_buffer[app.input_line] += carry
# -- Navigation arrows --
elif key == curses.KEY_UP:
if app.input_line > 0:
app.input_line -= 1
app.input_col = min(
app.input_col, len(app.input_buffer[app.input_line])
)
elif key == curses.KEY_DOWN:
if app.input_line < len(app.input_buffer) - 1:
app.input_line += 1
app.input_col = min(
app.input_col, len(app.input_buffer[app.input_line])
)
elif key == curses.KEY_LEFT:
if app.input_col > 0:
app.input_col -= 1
elif app.input_line > 0:
app.input_line -= 1
app.input_col = len(app.input_buffer[app.input_line])
elif key == curses.KEY_RIGHT:
if app.input_col < len(app.input_buffer[app.input_line]):
app.input_col += 1
elif app.input_line < len(app.input_buffer) - 1:
app.input_line += 1
app.input_col = 0
elif key == curses.KEY_HOME:
app.input_col = 0
elif key == curses.KEY_END:
app.input_col = len(app.input_buffer[app.input_line])
# -- Page Up / Down: scroll chat area --
elif key == curses.KEY_PPAGE:
app.scroll = max(0, app.scroll - (app.h - 10))
elif key == curses.KEY_NPAGE:
app.scroll += app.h - 10
# -- Resize terminal: tidak perlu action, next loop akan baca ukuran baru --
elif key == curses.KEY_RESIZE:
pass
# -- Tab: insert 4 spasi --
elif key == 9:
line = app.input_buffer[app.input_line]
app.input_buffer[app.input_line] = (
line[: app.input_col] + " " + line[app.input_col :]
)
app.input_col += 4
# -- Printable characters --
elif 32 <= key <= 255:
ch = chr(key)
line = app.input_buffer[app.input_line]
app.input_buffer[app.input_line] = (
line[: app.input_col] + ch + line[app.input_col :]
)
app.input_col += 1
def workspace_popup(app, stdscr):
# Overlay window kecil di tengah layar untuk input path workspace
pw = min(60, app.w - 4)
ph = 3
px = (app.w - pw) // 2
py = app.h // 2 - 1
win = curses.newwin(ph, pw, py, px)
win.box()
win.addstr(0, 2, " Workspace path: ")
win.addstr(1, 2, " " * (pw - 4))
curses.echo() # tampilkan input user
ws = win.getstr(1, 2, pw - 5).decode("utf-8")
curses.noecho()
del win
ws = ws.strip()
if ws:
resolved = os.path.abspath(ws)
if os.path.isdir(resolved):
os.chdir(resolved)
log(app, "system", f"Workspace \u2192 {resolved}")
else:
log(app, "error", f"Invalid directory: {resolved}")
stdscr.touchwin()
stdscr.refresh()

214
tui/render.py Normal file
View File

@ -0,0 +1,214 @@
# render.py — Semua fungsi drawing / tampilan curses.
# Setiap fungsi menerima `app` (instance HendrikTUI) dan `stdscr`
# lalu membaca state dari `app` untuk menggambar di layar.
import curses
import os
# -- Color pair IDs (id 1-9, id 0 = default curses) --
C_HEADER = 1 # header bar: biru
C_USER = 2 # user message: cyan
C_AI = 3 # AI response: hijau
C_SYSTEM = 4 # system log: kuning
C_INPUT = 5 # text input: putih
C_STATUS = 6 # status bar: hitam di atas kuning
C_STATUS_READY = 10 # status READY: hijau
C_STATUS_PROC = 11 # status PROCESSING: kuning
C_SEP = 7 # separator line: magenta
C_ERROR = 8 # error message: merah
C_INPUT_BORDER = 9 # border input box: biru
C_STATUS_INFO = 12 # status info (workspace/hints): putih
def init_colors():
# Daftarkan semua color pair sekali di awal.
# -1 = foreground/background default terminal.
curses.init_pair(C_HEADER, curses.COLOR_BLACK, curses.COLOR_BLUE)
curses.init_pair(C_USER, curses.COLOR_CYAN, -1)
curses.init_pair(C_AI, curses.COLOR_GREEN, -1)
curses.init_pair(C_SYSTEM, curses.COLOR_YELLOW, -1)
curses.init_pair(C_INPUT, curses.COLOR_WHITE, -1)
curses.init_pair(C_STATUS, curses.COLOR_BLACK, curses.COLOR_YELLOW)
curses.init_pair(C_STATUS_READY, curses.COLOR_WHITE, curses.COLOR_GREEN)
curses.init_pair(C_STATUS_PROC, curses.COLOR_BLACK, curses.COLOR_YELLOW)
curses.init_pair(C_STATUS_INFO, curses.COLOR_WHITE, -1)
curses.init_pair(C_SEP, curses.COLOR_MAGENTA, -1)
curses.init_pair(C_ERROR, curses.COLOR_RED, -1)
curses.init_pair(C_INPUT_BORDER, curses.COLOR_BLUE, -1)
def draw(app, stdscr):
# Panggil keempat fungsi gambar secara berurutan.
# Urutan penting: input digambar paling akhir supaya kursor
# bisa dipindah di atas layer paling atas.
draw_header(app, stdscr)
draw_chat(app, stdscr)
draw_status(app, stdscr)
draw_input(app, stdscr)
def draw_header(app, stdscr):
# Baris paling atas: " Hendrik AI Agent ─────── <model> "
w = app.w
name = " Hendrik AI Agent "
model = f" {app.llm.model} "
mid = w - len(model) - 1
pad = max(1, mid - len(name) - 1)
line = name + "\u2500" * pad + " " + model
attr = curses.color_pair(C_HEADER) | curses.A_BOLD
stdscr.addstr(0, 0, line[:w], attr)
def draw_chat(app, stdscr):
# Area chat — dari baris 1 sampai baris (h - 10).
# Bisa di-scroll dengan Page Up / Page Down.
# app.log berisi daftar item (role, text, time) untuk display.
h, w = app.h, app.w
chat_top = 1
chat_h = h - 10
if chat_h <= 0:
return
# Render log ke list of (color, text) agar scroll calculation akurat
rendered = []
for item in app.log:
role, text = item["role"], item["text"]
if role == "sep":
rendered.append((None, ""))
rendered.append((None, ""))
continue
label = ""
color = None
if role == "user":
label = f" You ({item['time']}) "
color = C_USER
elif role == "ai":
label = f" Hendrik ({item['time']}) "
color = C_AI
elif role == "system":
label = " \u25e6 "
color = C_SYSTEM
elif role == "error":
label = " \u2717 "
color = C_ERROR
lines = text.split("\n")
rendered.append((color, label + (lines[0] if lines else "")))
for line in lines[1:]:
rendered.append((color, " " + line))
# Clamp scroll agar tidak melebihi total baris
total = len(rendered)
max_scroll = max(0, total - chat_h)
if app.scroll > max_scroll:
app.scroll = max_scroll
app.scroll = max(0, app.scroll)
# Gambar baris yang terlihat (scroll sampai scroll + chat_h)
# Clear area chat dulu biar nggak ada text lama yang nyisa
for clear_y in range(chat_top, chat_top + chat_h):
try:
stdscr.addstr(clear_y, 0, " " * w, curses.A_NORMAL)
except curses.error:
pass
y = chat_top
for i in range(app.scroll, min(app.scroll + chat_h, total)):
color, text = rendered[i]
attr = curses.color_pair(color) if color else curses.A_NORMAL
if len(text) >= w:
text = text[: w - 1]
try:
stdscr.addstr(y, 0, text, attr)
except curses.error:
pass
y += 1
def draw_input(app, stdscr):
# Kotak input multi-line (maks 6 baris).
# Ada border atas dan bawah.
# Baris aktif ditandai "> " di depannya.
h, w = app.h, app.w
iy = h - 8
border_attr = curses.color_pair(C_INPUT_BORDER) | curses.A_BOLD
# Border atas ┌───┐ dan bawah └───┘
for off, text in [
(0, "\u250c" + "\u2500" * (w - 2) + "\u2510"),
(7, "\u2514" + "\u2500" * (w - 2) + "\u2518"),
]:
try:
stdscr.addstr(iy + off, 0, text[:w], border_attr)
except curses.error:
pass
# Scroll input horizontal jika buffer > 6 baris
total = len(app.input_buffer)
if total <= 6:
show = 0
else:
show = max(0, min(app.input_line - 3, total - 6))
cursor_yx = None
text_attr = curses.color_pair(C_INPUT)
for i in range(6):
idx = show + i
y = iy + 1 + i
line = app.input_buffer[idx] if idx < total else ""
max_text = w - 6
if len(line) > max_text:
line = line[:max_text]
# Border kiri
try:
stdscr.addstr(y, 0, "\u2502 ", border_attr)
except curses.error:
pass
# Isi baris
if idx < total:
content = "> " + line + " " * (w - 4 - 2 - len(line))
try:
stdscr.addstr(y, 2, content, text_attr)
except curses.error:
pass
else:
try:
stdscr.addstr(y, 2, " " * (w - 4), border_attr)
except curses.error:
pass
# Border kanan
try:
stdscr.addstr(y, w - 2, " \u2502", border_attr)
except curses.error:
pass
# Catat posisi kursor (hanya untuk baris aktif)
if idx == app.input_line and not app.processing:
cursor_yx = (y, 4 + min(app.input_col, max_text))
if cursor_yx:
stdscr.move(*cursor_yx)
def draw_status(app, stdscr):
# Status bar di baris h-9: workspace, mode (READY/PROCESSING), shortcut hints
h, w = app.h, app.w
y = h - 9
ws = os.getcwd()
mode = " PROCESSING " if app.processing else " READY "
hints = " ^D:send ^W:workspace ^C:exit "
max_ws = w - len(mode) - len(hints) - 4
if len(ws) > max_ws:
ws = ".." + ws[-(max_ws - 2):]
status = f" {ws} \u2502{mode}\u2502{hints}"
stdscr.addstr(y, 0, status[:w], curses.color_pair(C_STATUS_INFO))
# Highlight mode dengan warna berbeda
mode_start = len(f" {ws} \u2502")
mode_end = mode_start + len(mode)
mode_attr = curses.color_pair(C_STATUS_READY) if not app.processing else curses.color_pair(C_STATUS_PROC)
stdscr.addstr(y, mode_start, mode, mode_attr | curses.A_BOLD)