First commit

This commit is contained in:
Dita Aji Pratama 2026-05-02 21:08:42 +07:00
commit b583b7546a
10 changed files with 332 additions and 0 deletions

14
.env.example Normal file
View File

@ -0,0 +1,14 @@
# Environment Variables for AI Agent
# Copy to .env and modify as needed
# LLM Configuration
LLM_BASE_URL=http://localhost:11434/v1
LLM_MODEL=deepseek-r1:8b
LLM_API_KEY=ollama
# Agent Configuration
AGENT_MAX_ITERATIONS=10
# Tool Configuration
MAX_TOOL_OUTPUT=4000

4
.gitignore vendored Normal file
View File

@ -0,0 +1,4 @@
.env
.venv
**/__pycache__
*.pyc

14
config.py Normal file
View File

@ -0,0 +1,14 @@
import os
from dotenv import load_dotenv
load_dotenv()
# LLM Configuration
LLM_BASE_URL = os.getenv("LLM_BASE_URL", default="http://localhost:11434/v1")
LLM_MODEL = os.getenv("LLM_MODEL", default="deepseek-r1:8b")
LLM_API_KEY = os.getenv("LLM_API_KEY", default="ollama")
LLM_TIMEOUT = int(os.getenv("LLM_TIMEOUT", default="600"))
# Agent Configuration
AGENT_MAX_ITERATIONS = int(os.getenv("AGENT_MAX_ITERATIONS", default="10"))
# Tool Configuration (for future use)
MAX_TOOL_OUTPUT = int(os.getenv("MAX_TOOL_OUTPUT", default="4000"))

89
hendrik.py Normal file
View File

@ -0,0 +1,89 @@
import os, sys, json
import config
from llm_client import LLMClient
from tools import coder
from script import gadget
tools_definition = [
gadget.tools_mapping( coder.schema_read_file, coder.read_file ),
gadget.tools_mapping( coder.schema_write_file, coder.write_file ),
gadget.tools_mapping( coder.schema_edit_file, coder.edit_file ),
gadget.tools_mapping( coder.schema_run_bash, coder.run_bash ),
gadget.tools_mapping( coder.schema_search_code, coder.search_code ),
gadget.tools_mapping( coder.schema_git_operation, coder.git_operation ),
]
TOOLS = [t["schema"] for t in tools_definition] # Schemas
TOOL_HANDLERS = {t["name"]: t["handler"] for t in tools_definition} # Map
SYSTEM_PROMPT = """You are a coding agent that assists with software engineering tasks. You have access to the following tools:
1. read_file: Read file contents with line numbers
2. write_file: Write content to a file (overwrites existing)
3. edit_file: Replace text in a file
4. run_bash: Execute bash commands
5. search_code: Search for files (glob) or file contents (regex)
6. git_operation: Run git commands
Use tools by returning tool calls when needed. After receiving tool results, continue your reasoning. When you have the final answer, return it as plain text without tool calls."""
def agent_loop(user_query, llm_client):
messages = [
{"role": "system" , "content": SYSTEM_PROMPT },
{"role": "user" , "content": user_query }
]
for _ in range(config.AGENT_MAX_ITERATIONS):
response = llm_client.chat(messages, tools=TOOLS)
if response.tool_calls:
assistant_msg = {
"role": "assistant",
"content": response.content,
"tool_calls": response.tool_calls
}
messages.append(assistant_msg)
for tool_call in response.tool_calls:
tool_name = tool_call['function']['name']
tool_args = json.loads(tool_call['function']['arguments'])
handler = TOOL_HANDLERS.get(tool_name)
if not handler:
result = f"Tool {tool_name} not found"
else:
try:
if tool_name == "search_code":
result = handler(
pattern=tool_args["pattern"],
search_type=tool_args["search_type"],
path=tool_args.get("path", ".")
)
elif tool_name == "git_operation":
result = handler(args=tool_args["args"])
else:
result = handler(**tool_args)
except Exception as e:
result = f"Error executing tool: {str(e)}"
messages.append({
"role": "tool",
"tool_call_id": tool_call['id'],
"content": str(result)
})
else:
return response.content
return "Max iterations reached without final answer."
def main():
llm_client = LLMClient(base_url=config.LLM_BASE_URL, model=config.LLM_MODEL, api_key=config.LLM_API_KEY)
if len(sys.argv) > 1:
user_query = " ".join(sys.argv[1:])
else:
print("Enter your query (Ctrl+D to submit):")
user_query = sys.stdin.read().strip()
if not user_query:
print("No query provided.")
return
print("Thinking...")
final_answer = agent_loop(user_query, llm_client)
print("\nFinal Answer:")
print(final_answer)
if __name__ == "__main__":
main()

40
llm_client.py Normal file
View File

@ -0,0 +1,40 @@
import json
import urllib.request
import urllib.error
from config import LLM_BASE_URL, LLM_MODEL, LLM_API_KEY, LLM_TIMEOUT
class LLMClient:
class Message:
def __init__(self, msg):
self.content = msg.get('content', '')
self.tool_calls = msg.get('tool_calls', None)
def __init__(self, base_url=LLM_BASE_URL, model=LLM_MODEL, api_key=LLM_API_KEY):
self.base_url = base_url.rstrip('/')
self.model = model
self.api_key = api_key
def chat(self, messages, tools=None):
url = f"{self.base_url}/chat/completions"
payload = {
"model": self.model,
"messages": messages
}
if tools:
payload["tools"] = tools
payload["tool_choice"] = "auto"
data = json.dumps(payload).encode('utf-8')
req = urllib.request.Request(url, data=data, method='POST')
req.add_header('Content-Type', 'application/json')
req.add_header('Authorization', f'Bearer {self.api_key}')
try:
with urllib.request.urlopen(req, timeout=LLM_TIMEOUT) as resp:
response = json.loads(resp.read().decode('utf-8'))
message = response['choices'][0]['message']
return self.Message(message)
except urllib.error.HTTPError as e:
return self.Message({'content': f"HTTP Error: {e.code} {e.reason}", 'tool_calls': None})
except Exception as e:
return self.Message({'content': f"Error: {str(e)}", 'tool_calls': None})

1
requirements.txt Normal file
View File

@ -0,0 +1 @@
python-dotenv>=1.0.0

0
scripts/__init__.py Normal file
View File

4
scripts/gadget.py Normal file
View File

@ -0,0 +1,4 @@
def tools_mapping(schema, handler, name=None):
tool_name = name or schema["function"]["name"]
return {"name": tool_name, "schema": schema, "handler": handler}

0
tools/__init__.py Normal file
View File

166
tools/coder.py Normal file
View File

@ -0,0 +1,166 @@
import os
import subprocess
import re
import glob as glob_module
schema_read_file = {
"type": "function",
"function": {
"name" : "read_file",
"description" : "Read the contents of a file. Returns the file content with line numbers prefixed.",
"parameters" : {
"type" : "object",
"properties" : {
"path": {"type": "string", "description": "Absolute path to the file to read"}
},
"required" : ["path"]
}
}
}
def read_file(path):
try:
with open(path, 'r', encoding='utf-8', errors='replace') as f:
lines = f.readlines()
return ''.join(f"{i+1}: {line}" for i, line in enumerate(lines))
except Exception as e:
return f"Error reading file: {str(e)}"
schema_write_file = {
"type": "function",
"function": {
"name" : "write_file",
"description" : "Write content to a file. Overwrites the file if it exists.",
"parameters" : {
"type" : "object",
"properties" : {
"path": {"type": "string", "description": "Absolute path to the file to write"},
"content": {"type": "string", "description": "Content to write to the file"}
},
"required" : ["path", "content"]
}
}
}
def write_file(path, content):
try:
with open(path, 'w', encoding='utf-8') as f:
f.write(content)
return f"Successfully wrote to {path}"
except Exception as e:
return f"Error writing file: {str(e)}"
schema_edit_file = {
"type": "function",
"function": {
"name" : "edit_file",
"description" : "Replace old_string with new_string in a file. If old_string is not found, returns error. If multiple matches, replaces all.",
"parameters" : {
"type" : "object",
"properties" : {
"path": {"type": "string", "description": "Absolute path to the file to edit"},
"old_string": {"type": "string", "description": "String to replace"},
"new_string": {"type": "string", "description": "Replacement string"}
},
"required" : ["path", "old_string", "new_string"]
}
}
}
def edit_file(path, old_string, new_string):
try:
with open(path, 'r', encoding='utf-8', errors='replace') as f:
content = f.read()
if old_string not in content:
return f"Error: old_string not found in {path}"
new_content = content.replace(old_string, new_string)
with open(path, 'w', encoding='utf-8') as f:
f.write(new_content)
return f"Successfully edited {path}"
except Exception as e:
return f"Error editing file: {str(e)}"
schema_run_bash = {
"type": "function",
"function": {
"name" : "run_bash",
"description" : "Run a bash command. Returns stdout, stderr, and return code.",
"parameters" : {
"type" : "object",
"properties" : {
"command": {"type": "string", "description": "Bash command to execute"}
},
"required" : ["command"]
}
}
}
def run_bash(command):
try:
result = subprocess.run(command, shell=True, capture_output=True, text=True, timeout=30)
return f"stdout:\n{result.stdout}\nstderr:\n{result.stderr}\nreturn code: {result.returncode}"
except Exception as e:
return f"Error running command: {str(e)}"
schema_search_code = {
"type": "function",
"function": {
"name": "search_code",
"description": "Search for files or content in files. Use type 'glob' to find files matching a pattern, 'content' to search file contents for a regex pattern.",
"parameters": {
"type": "object",
"properties": {
"pattern": {"type": "string", "description": "Glob pattern (for type 'glob') or regex pattern (for type 'content')"},
"path": {"type": "string", "description": "Directory to search in (default: current working directory)", "default": "."},
"search_type": {"type": "string", "enum": ["glob", "content"], "description": "Type of search: 'glob' for files, 'content' for file contents"}
},
"required": ["pattern", "search_type"]
}
}
}
def search_code(pattern, search_type, path="."):
try:
if search_type == "glob":
files = glob_module.glob(f"{path}/**/{pattern}", recursive=True)
return "\n".join(files) if files else "No files found"
elif search_type == "content":
results = []
for root, dirs, files in os.walk(path):
for file in files:
file_path = os.path.join(root, file)
try:
with open(file_path, 'r', encoding='utf-8', errors='replace') as f:
for i, line in enumerate(f.readlines(), 1):
if re.search(pattern, line):
results.append(f"{file_path}:{i}: {line.strip()}")
except:
continue
return "\n".join(results[:50]) if results else "No matches found"
else:
return "Invalid search_type. Use 'glob' or 'content'."
except Exception as e:
return f"Error searching: {str(e)}"
schema_git_operation = {
"type": "function",
"function": {
"name": "git_operation",
"description": "Run a git command. Pass the git arguments as a list (e.g., ['status', '--short'] for 'git status --short').",
"parameters": {
"type": "object",
"properties": {
"args": {"type": "array", "items": {"type": "string"}, "description": "List of git command arguments (without 'git' prefix)"}
},
"required": ["args"]
}
}
}
def git_operation(args):
try:
result = subprocess.run(["git", *args], capture_output=True, text=True, timeout=10)
return f"stdout:\n{result.stdout}\nstderr:\n{result.stderr}\nreturn code: {result.returncode}"
except Exception as e:
return f"Error running git command: {str(e)}"