commit b583b7546ad32a4c3a4e2d06fc0788dedc2da0cd Author: Dita Aji Pratama Date: Sat May 2 21:08:42 2026 +0700 First commit diff --git a/.env.example b/.env.example new file mode 100644 index 0000000..f99b044 --- /dev/null +++ b/.env.example @@ -0,0 +1,14 @@ +# Environment Variables for AI Agent +# Copy to .env and modify as needed + +# LLM Configuration +LLM_BASE_URL=http://localhost:11434/v1 +LLM_MODEL=deepseek-r1:8b +LLM_API_KEY=ollama + +# Agent Configuration +AGENT_MAX_ITERATIONS=10 + +# Tool Configuration +MAX_TOOL_OUTPUT=4000 + diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..ba2e933 --- /dev/null +++ b/.gitignore @@ -0,0 +1,4 @@ +.env +.venv +**/__pycache__ +*.pyc diff --git a/config.py b/config.py new file mode 100644 index 0000000..eabd8a0 --- /dev/null +++ b/config.py @@ -0,0 +1,14 @@ +import os +from dotenv import load_dotenv + +load_dotenv() + +# LLM Configuration +LLM_BASE_URL = os.getenv("LLM_BASE_URL", default="http://localhost:11434/v1") +LLM_MODEL = os.getenv("LLM_MODEL", default="deepseek-r1:8b") +LLM_API_KEY = os.getenv("LLM_API_KEY", default="ollama") +LLM_TIMEOUT = int(os.getenv("LLM_TIMEOUT", default="600")) +# Agent Configuration +AGENT_MAX_ITERATIONS = int(os.getenv("AGENT_MAX_ITERATIONS", default="10")) +# Tool Configuration (for future use) +MAX_TOOL_OUTPUT = int(os.getenv("MAX_TOOL_OUTPUT", default="4000")) diff --git a/hendrik.py b/hendrik.py new file mode 100644 index 0000000..0f01206 --- /dev/null +++ b/hendrik.py @@ -0,0 +1,89 @@ +import os, sys, json +import config +from llm_client import LLMClient +from tools import coder +from script import gadget + +tools_definition = [ + gadget.tools_mapping( coder.schema_read_file, coder.read_file ), + gadget.tools_mapping( coder.schema_write_file, coder.write_file ), + gadget.tools_mapping( coder.schema_edit_file, coder.edit_file ), + gadget.tools_mapping( coder.schema_run_bash, coder.run_bash ), + gadget.tools_mapping( coder.schema_search_code, coder.search_code ), + gadget.tools_mapping( coder.schema_git_operation, coder.git_operation ), +] + +TOOLS = [t["schema"] for t in tools_definition] # Schemas +TOOL_HANDLERS = {t["name"]: t["handler"] for t in tools_definition} # Map + +SYSTEM_PROMPT = """You are a coding agent that assists with software engineering tasks. You have access to the following tools: + +1. read_file: Read file contents with line numbers +2. write_file: Write content to a file (overwrites existing) +3. edit_file: Replace text in a file +4. run_bash: Execute bash commands +5. search_code: Search for files (glob) or file contents (regex) +6. git_operation: Run git commands + +Use tools by returning tool calls when needed. After receiving tool results, continue your reasoning. When you have the final answer, return it as plain text without tool calls.""" + +def agent_loop(user_query, llm_client): + messages = [ + {"role": "system" , "content": SYSTEM_PROMPT }, + {"role": "user" , "content": user_query } + ] + for _ in range(config.AGENT_MAX_ITERATIONS): + response = llm_client.chat(messages, tools=TOOLS) + if response.tool_calls: + assistant_msg = { + "role": "assistant", + "content": response.content, + "tool_calls": response.tool_calls + } + messages.append(assistant_msg) + for tool_call in response.tool_calls: + tool_name = tool_call['function']['name'] + tool_args = json.loads(tool_call['function']['arguments']) + handler = TOOL_HANDLERS.get(tool_name) + if not handler: + result = f"Tool {tool_name} not found" + else: + try: + if tool_name == "search_code": + result = handler( + pattern=tool_args["pattern"], + search_type=tool_args["search_type"], + path=tool_args.get("path", ".") + ) + elif tool_name == "git_operation": + result = handler(args=tool_args["args"]) + else: + result = handler(**tool_args) + except Exception as e: + result = f"Error executing tool: {str(e)}" + messages.append({ + "role": "tool", + "tool_call_id": tool_call['id'], + "content": str(result) + }) + else: + return response.content + return "Max iterations reached without final answer." + +def main(): + llm_client = LLMClient(base_url=config.LLM_BASE_URL, model=config.LLM_MODEL, api_key=config.LLM_API_KEY) + if len(sys.argv) > 1: + user_query = " ".join(sys.argv[1:]) + else: + print("Enter your query (Ctrl+D to submit):") + user_query = sys.stdin.read().strip() + if not user_query: + print("No query provided.") + return + print("Thinking...") + final_answer = agent_loop(user_query, llm_client) + print("\nFinal Answer:") + print(final_answer) + +if __name__ == "__main__": + main() diff --git a/llm_client.py b/llm_client.py new file mode 100644 index 0000000..03ff885 --- /dev/null +++ b/llm_client.py @@ -0,0 +1,40 @@ +import json +import urllib.request +import urllib.error +from config import LLM_BASE_URL, LLM_MODEL, LLM_API_KEY, LLM_TIMEOUT + +class LLMClient: + class Message: + def __init__(self, msg): + self.content = msg.get('content', '') + self.tool_calls = msg.get('tool_calls', None) + + def __init__(self, base_url=LLM_BASE_URL, model=LLM_MODEL, api_key=LLM_API_KEY): + self.base_url = base_url.rstrip('/') + self.model = model + self.api_key = api_key + + def chat(self, messages, tools=None): + url = f"{self.base_url}/chat/completions" + payload = { + "model": self.model, + "messages": messages + } + if tools: + payload["tools"] = tools + payload["tool_choice"] = "auto" + + data = json.dumps(payload).encode('utf-8') + req = urllib.request.Request(url, data=data, method='POST') + req.add_header('Content-Type', 'application/json') + req.add_header('Authorization', f'Bearer {self.api_key}') + + try: + with urllib.request.urlopen(req, timeout=LLM_TIMEOUT) as resp: + response = json.loads(resp.read().decode('utf-8')) + message = response['choices'][0]['message'] + return self.Message(message) + except urllib.error.HTTPError as e: + return self.Message({'content': f"HTTP Error: {e.code} {e.reason}", 'tool_calls': None}) + except Exception as e: + return self.Message({'content': f"Error: {str(e)}", 'tool_calls': None}) diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..0eed382 --- /dev/null +++ b/requirements.txt @@ -0,0 +1 @@ +python-dotenv>=1.0.0 diff --git a/scripts/__init__.py b/scripts/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/scripts/gadget.py b/scripts/gadget.py new file mode 100644 index 0000000..fd70fde --- /dev/null +++ b/scripts/gadget.py @@ -0,0 +1,4 @@ +def tools_mapping(schema, handler, name=None): + tool_name = name or schema["function"]["name"] + return {"name": tool_name, "schema": schema, "handler": handler} + diff --git a/tools/__init__.py b/tools/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tools/coder.py b/tools/coder.py new file mode 100644 index 0000000..4e4929b --- /dev/null +++ b/tools/coder.py @@ -0,0 +1,166 @@ +import os +import subprocess +import re +import glob as glob_module + +schema_read_file = { + "type": "function", + "function": { + "name" : "read_file", + "description" : "Read the contents of a file. Returns the file content with line numbers prefixed.", + "parameters" : { + "type" : "object", + "properties" : { + "path": {"type": "string", "description": "Absolute path to the file to read"} + }, + "required" : ["path"] + } + } +} + +def read_file(path): + try: + with open(path, 'r', encoding='utf-8', errors='replace') as f: + lines = f.readlines() + return ''.join(f"{i+1}: {line}" for i, line in enumerate(lines)) + except Exception as e: + return f"Error reading file: {str(e)}" + +schema_write_file = { + "type": "function", + "function": { + "name" : "write_file", + "description" : "Write content to a file. Overwrites the file if it exists.", + "parameters" : { + "type" : "object", + "properties" : { + "path": {"type": "string", "description": "Absolute path to the file to write"}, + "content": {"type": "string", "description": "Content to write to the file"} + }, + "required" : ["path", "content"] + } + } +} + +def write_file(path, content): + try: + with open(path, 'w', encoding='utf-8') as f: + f.write(content) + return f"Successfully wrote to {path}" + except Exception as e: + return f"Error writing file: {str(e)}" + +schema_edit_file = { + "type": "function", + "function": { + "name" : "edit_file", + "description" : "Replace old_string with new_string in a file. If old_string is not found, returns error. If multiple matches, replaces all.", + "parameters" : { + "type" : "object", + "properties" : { + "path": {"type": "string", "description": "Absolute path to the file to edit"}, + "old_string": {"type": "string", "description": "String to replace"}, + "new_string": {"type": "string", "description": "Replacement string"} + }, + "required" : ["path", "old_string", "new_string"] + } + } +} + +def edit_file(path, old_string, new_string): + try: + with open(path, 'r', encoding='utf-8', errors='replace') as f: + content = f.read() + if old_string not in content: + return f"Error: old_string not found in {path}" + new_content = content.replace(old_string, new_string) + with open(path, 'w', encoding='utf-8') as f: + f.write(new_content) + return f"Successfully edited {path}" + except Exception as e: + return f"Error editing file: {str(e)}" + +schema_run_bash = { + "type": "function", + "function": { + "name" : "run_bash", + "description" : "Run a bash command. Returns stdout, stderr, and return code.", + "parameters" : { + "type" : "object", + "properties" : { + "command": {"type": "string", "description": "Bash command to execute"} + }, + "required" : ["command"] + } + } +} + +def run_bash(command): + try: + result = subprocess.run(command, shell=True, capture_output=True, text=True, timeout=30) + return f"stdout:\n{result.stdout}\nstderr:\n{result.stderr}\nreturn code: {result.returncode}" + except Exception as e: + return f"Error running command: {str(e)}" + +schema_search_code = { + "type": "function", + "function": { + "name": "search_code", + "description": "Search for files or content in files. Use type 'glob' to find files matching a pattern, 'content' to search file contents for a regex pattern.", + "parameters": { + "type": "object", + "properties": { + "pattern": {"type": "string", "description": "Glob pattern (for type 'glob') or regex pattern (for type 'content')"}, + "path": {"type": "string", "description": "Directory to search in (default: current working directory)", "default": "."}, + "search_type": {"type": "string", "enum": ["glob", "content"], "description": "Type of search: 'glob' for files, 'content' for file contents"} + }, + "required": ["pattern", "search_type"] + } + } +} + +def search_code(pattern, search_type, path="."): + try: + if search_type == "glob": + files = glob_module.glob(f"{path}/**/{pattern}", recursive=True) + return "\n".join(files) if files else "No files found" + elif search_type == "content": + results = [] + for root, dirs, files in os.walk(path): + for file in files: + file_path = os.path.join(root, file) + try: + with open(file_path, 'r', encoding='utf-8', errors='replace') as f: + for i, line in enumerate(f.readlines(), 1): + if re.search(pattern, line): + results.append(f"{file_path}:{i}: {line.strip()}") + except: + continue + return "\n".join(results[:50]) if results else "No matches found" + else: + return "Invalid search_type. Use 'glob' or 'content'." + except Exception as e: + return f"Error searching: {str(e)}" + +schema_git_operation = { + "type": "function", + "function": { + "name": "git_operation", + "description": "Run a git command. Pass the git arguments as a list (e.g., ['status', '--short'] for 'git status --short').", + "parameters": { + "type": "object", + "properties": { + "args": {"type": "array", "items": {"type": "string"}, "description": "List of git command arguments (without 'git' prefix)"} + }, + "required": ["args"] + } + } +} + +def git_operation(args): + try: + result = subprocess.run(["git", *args], capture_output=True, text=True, timeout=10) + return f"stdout:\n{result.stdout}\nstderr:\n{result.stderr}\nreturn code: {result.returncode}" + except Exception as e: + return f"Error running git command: {str(e)}" +