From 23348440b1e76c82dcd2142d4d1ed9d9469d647c Mon Sep 17 00:00:00 2001 From: Dita Aji Pratama Date: Mon, 8 Jun 2026 02:41:29 +0700 Subject: [PATCH] XMPP feature --- .env.example | 10 +- config.py | 5 + hendrik.py | 42 ++++-- requirements.txt | 1 + services/__init__.py | 0 services/session_manager.py | 48 +++++++ services/xmpp_client.py | 247 ++++++++++++++++++++++++++++++++++++ 7 files changed, 339 insertions(+), 14 deletions(-) create mode 100644 services/__init__.py create mode 100644 services/session_manager.py create mode 100644 services/xmpp_client.py diff --git a/.env.example b/.env.example index f99b044..feb5092 100644 --- a/.env.example +++ b/.env.example @@ -5,10 +5,12 @@ LLM_BASE_URL=http://localhost:11434/v1 LLM_MODEL=deepseek-r1:8b LLM_API_KEY=ollama - -# Agent Configuration AGENT_MAX_ITERATIONS=10 - -# Tool Configuration MAX_TOOL_OUTPUT=4000 +# XMPP (default: disabled) +XMPP_ENABLED=False +XMPP_USERNAME= +XMPP_PASSWORD= +# XMPP_MUC_ROOMS=room1@conference.server,room2@conference.server + diff --git a/config.py b/config.py index 930667d..0ef0421 100644 --- a/config.py +++ b/config.py @@ -15,3 +15,8 @@ MAX_TOOL_OUTPUT = int( os.getenv("MAX_TOOL_OUTPUT", default="4000" # RAG Configuration RAG_PERSIST_DIR = os.getenv("RAG_PERSIST_DIR", default="chroma_db" ) # Embedding: ChromaDB ONNX default (all-MiniLM-L6-v2, lokal, tidak perlu API call) +# XMPP Configuration +XMPP_ENABLED = os.getenv("XMPP_ENABLED", default="False" ).strip().lower() in ("true", "1", "yes") +XMPP_USERNAME = os.getenv("XMPP_USERNAME", default="" ) +XMPP_PASSWORD = os.getenv("XMPP_PASSWORD", default="" ) +XMPP_MUC_ROOMS = os.getenv("XMPP_MUC_ROOMS", default="" ) diff --git a/hendrik.py b/hendrik.py index e6be11c..719c75b 100644 --- a/hendrik.py +++ b/hendrik.py @@ -4,7 +4,6 @@ import config from scripts.llm_client import LLMClient from tools import coder, rag from scripts import gadget -from tui import HendrikTUI # Daftar tools yang tersedia tools_definition = [ @@ -26,9 +25,10 @@ tools_definition = [ TOOLS = gadget.tool_schemas (tools_definition) TOOL_HANDLERS = gadget.tool_handlers (tools_definition) + def main(): llm_client = LLMClient(config.llm_baseurl, config.llm_model, config.llm_api_key, config.llm_timeout) - + # Parsing arguments `-w ` atau `--workspace ` workspace = None i = 1 @@ -47,14 +47,36 @@ def main(): sys.exit(1) os.chdir(resolved) - HendrikTUI( - llm_client = llm_client, - tools_definition = tools_definition, - TOOLS = TOOLS, - TOOL_HANDLERS = TOOL_HANDLERS, - build_system_prompt = gadget.build_system_prompt, - agent_max_iterations = config.AGENT_MAX_ITERATIONS, - ).run() # Luncurkan TUI + if config.XMPP_ENABLED: + from services.xmpp_client import XMPPClient + + muc_rooms = [] + if config.XMPP_MUC_ROOMS.strip(): + muc_rooms = [r.strip() for r in config.XMPP_MUC_ROOMS.split(',') if r.strip()] + + client = XMPPClient( + jid = config.XMPP_USERNAME, + password = config.XMPP_PASSWORD, + llm_client = llm_client, + tools_definition = tools_definition, + TOOLS = TOOLS, + TOOL_HANDLERS = TOOL_HANDLERS, + build_system_prompt = gadget.build_system_prompt, + agent_max_iterations= config.AGENT_MAX_ITERATIONS, + muc_rooms = muc_rooms, + ) + client.start() # blocking, headless + else: + from tui import HendrikTUI + HendrikTUI( + llm_client = llm_client, + tools_definition = tools_definition, + TOOLS = TOOLS, + TOOL_HANDLERS = TOOL_HANDLERS, + build_system_prompt = gadget.build_system_prompt, + agent_max_iterations = config.AGENT_MAX_ITERATIONS, + ).run() + if __name__ == "__main__": main() diff --git a/requirements.txt b/requirements.txt index 3c2f1aa..db402cb 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,2 +1,3 @@ python-dotenv>=1.0.0 chromadb>=0.5.0 +slixmpp diff --git a/services/__init__.py b/services/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/services/session_manager.py b/services/session_manager.py new file mode 100644 index 0000000..0a690f8 --- /dev/null +++ b/services/session_manager.py @@ -0,0 +1,48 @@ +import threading +import time + +class Session: + def __init__(self, session_id: str, system_prompt: str): + self.session_id = session_id + self.messages = [{"role": "system", "content": system_prompt}] + self.last_activity = time.monotonic() + self._timer: threading.Timer | None = None + + def add_message(self, role: str, content: str, **kwargs): + msg = {"role": role, "content": content} + msg.update(kwargs) + self.messages.append(msg) + + def cancel_timer(self): + if self._timer: + self._timer.cancel() + self._timer = None + + def start_timer(self, timeout: float, callback, *args): + self.cancel_timer() + self._timer = threading.Timer(timeout, callback, args) + self._timer.daemon = True + self._timer.start() + +class SessionManager: + def __init__(self): + self._sessions: dict[str, Session] = {} + self._lock = threading.Lock() + + def get_or_create(self, session_id: str, system_prompt: str) -> Session: + with self._lock: + if session_id not in self._sessions: + self._sessions[session_id] = Session(session_id, system_prompt) + return self._sessions[session_id] + + def reset(self, session_id: str): + with self._lock: + session = self._sessions.pop(session_id, None) + if session: + session.cancel_timer() + + def cleanup_all(self): + with self._lock: + for session in self._sessions.values(): + session.cancel_timer() + self._sessions.clear() diff --git a/services/xmpp_client.py b/services/xmpp_client.py new file mode 100644 index 0000000..313a691 --- /dev/null +++ b/services/xmpp_client.py @@ -0,0 +1,247 @@ +import asyncio +import json +import signal +import threading +from datetime import datetime + +from slixmpp import ClientXMPP +from services.session_manager import SessionManager + + +def _ts(): + return datetime.now().strftime('%H:%M:%S') + + +class XMPPClient(ClientXMPP): + def __init__(self, jid, password, llm_client, tools_definition, TOOLS, + TOOL_HANDLERS, build_system_prompt, agent_max_iterations, + muc_rooms=None): + super().__init__(jid, password) + + self._llm = llm_client + self._tools_def = tools_definition + self._TOOLS = TOOLS + self._TOOL_HANDLERS = TOOL_HANDLERS + self._build_system_prompt = build_system_prompt + self._max_iterations = agent_max_iterations + self._muc_rooms = muc_rooms or [] + self._muc_nick = jid.split('@')[0] + self._muc_ready: set[str] = set() + + self._session_mgr = SessionManager() + self._loop = None + self._stopped: asyncio.Event | None = None + + self.auto_reconnect = True + + self.register_plugin('xep_0030') + self.register_plugin('xep_0045') + self.register_plugin('xep_0199') + + self.add_event_handler('session_start', self._on_session_start) + self.add_event_handler('message', self._on_message) + self.add_event_handler('groupchat_message', self._on_groupchat_message) + self.add_event_handler('disconnected', self._on_disconnected) + self.add_event_handler('connected', self._on_connected) + self.add_event_handler('groupchat_presence', self._on_muc_presence) + + async def _on_connected(self, event): + print(f'[{_ts()}] XMPP connected') + + async def _on_disconnected(self, event): + print(f'[{_ts()}] XMPP disconnected') + + async def _on_session_start(self, event): + self.send_presence() + self.get_roster() + print(f'[{_ts()}] XMPP online as {self.boundjid.full}') + for room in self._muc_rooms: + try: + await self.plugin['xep_0045'].join_muc_wait(room, self._muc_nick, maxstanzas=0) + print(f'[{_ts()}] Joined MUC room: {room}') + except Exception as e: + print(f'[{_ts()}] MUC join failed ({room}): {e}') + + def _on_message(self, msg): + if msg['type'] not in ('chat', 'normal'): + return + jid = msg['from'].bare + body = msg['body'].strip() + if not body: + return + print(f'[{_ts()}] DM from {jid}: {body[:60]}') + threading.Thread(target=self._process_dm, args=(jid, body), daemon=True).start() + + def _on_groupchat_message(self, msg): + if msg['type'] != 'groupchat': + return + nick = msg['from'].resource + if nick == self._muc_nick: + return + room = msg['from'].bare + if room not in self._muc_ready: + return + body = msg['body'].strip() + if not body: + return + print(f'[{_ts()}] MUC [{room}] <{nick}>: {body[:60]}') + threading.Thread(target=self._process_muc, args=(room, nick, body), daemon=True).start() + + def _on_muc_presence(self, presence): + room = presence['from'].bare + nick = presence['from'].resource + ptype = presence['type'] + if nick == self._muc_nick and ptype not in ('unavailable', 'error'): + self._muc_ready.add(room) + if ptype == 'unavailable': + print(f'[{_ts()}] MUC [{room}] <{nick}> left') + elif ptype == 'error': + print(f'[{_ts()}] MUC [{room}] error: {presence}') + else: + print(f'[{_ts()}] MUC [{room}] <{nick}> joined (type={ptype})') + + def _process_dm(self, jid, body): + session = self._session_mgr.get_or_create( + jid, self._build_system_prompt(self._tools_def) + ) + session.cancel_timer() + + self.send_presence_subscription(pto=jid, ptype='subscribed') + + if body == ':new': + self._session_mgr.reset(jid) + print(f'[{_ts()}] Session reset for {jid}') + self._schedule_send(jid, 'Memulai sesi baru. Ada yang bisa di bantu?') + return + + session.add_message('user', body) + + self._schedule_send(jid, f'> {body}\nThinking...') + self._agent_loop(session, jid, body, 'chat') + + session.start_timer(300, self._timeout_session, jid, 'chat') + + def _process_muc(self, room, nick, body): + session = self._session_mgr.get_or_create( + room, self._build_system_prompt(self._tools_def) + ) + session.cancel_timer() + + if body == ':new': + self._session_mgr.reset(room) + print(f'[{_ts()}] Session reset for MUC room {room}') + self._schedule_send(room, 'Memulai sesi baru. Ada yang bisa di bantu?', mtype='groupchat') + return + + prefixed = f'[{nick}] {body}' + session.add_message('user', prefixed) + + self._schedule_send(room, f'> [{nick}] {body}\nThinking...', mtype='groupchat') + self._agent_loop(session, room, f'[{nick}] {body}', 'groupchat') + + session.start_timer(300, self._timeout_session, room, 'groupchat') + + def _agent_loop(self, session, to, quote, mtype): + for step in range(self._max_iterations): + print(f'[{_ts()}] Step {step + 1} — calling LLM...') + response = self._llm.chat(session.messages, tools=self._TOOLS) + + if response.tool_calls: + amsg = { + 'role': 'assistant', + 'content': response.content, + 'tool_calls': response.tool_calls, + } + session.messages.append(amsg) + + tnames = [tc['function']['name'] for tc in response.tool_calls] + print(f'[{_ts()}] Using tools: {", ".join(tnames)}') + self._schedule_send(to, f'> {quote}\nUsing: {", ".join(tnames)}', mtype) + + for tc in response.tool_calls: + result = self._execute_tool(tc) + session.messages.append({ + 'role': 'tool', + 'tool_call_id': tc['id'], + 'content': str(result), + }) + else: + if response.content: + print(f'[{_ts()}] Response sent ({len(response.content)} chars)') + session.messages.append({'role': 'assistant', 'content': response.content}) + self._schedule_send(to, f'> {quote}\n{response.content}', mtype) + return + + print(f'[{_ts()}] Max iterations ({self._max_iterations}) reached') + session.messages.append({ + 'role': 'assistant', + 'content': 'Max iterations reached without final answer.', + }) + self._schedule_send(to, f'> {quote}\nMax iterations reached without final answer.', mtype) + + def _execute_tool(self, tool_call): + tname = tool_call['function']['name'] + targs = json.loads(tool_call['function']['arguments']) + handler = self._TOOL_HANDLERS.get(tname) + if not handler: + return f'Tool {tname} not found' + try: + if tname == 'search_code': + return handler( + pattern=targs['pattern'], + search_type=targs['search_type'], + path=targs.get('path', '.'), + ) + elif tname == 'git_operation': + return handler(args=targs['args']) + else: + return handler(**targs) + except Exception as e: + return f'Error executing tool: {str(e)}' + + def _schedule_send(self, to, body, mtype='chat'): + if self._loop and not self._loop.is_closed(): + asyncio.run_coroutine_threadsafe( + self._send_coro(to, body, mtype), self._loop + ) + else: + print(f'[{_ts()}] WARNING: cannot send to {to} — loop unavailable') + + async def _send_coro(self, to, body, mtype): + try: + msg = self.make_message(mto=to, mbody=body, mtype=mtype) + msg.send() + except Exception as e: + print(f'[{_ts()}] SEND ERROR: {e}') + + def _timeout_session(self, session_id, mtype): + print(f'[{_ts()}] Session timeout: {session_id}') + self._schedule_send(session_id, 'Sesi ditutup. Sampai jumpa', mtype) + self._session_mgr.reset(session_id) + + def start(self): + print(f'[{_ts()}] Starting XMPP service...') + asyncio.run(self._run()) + + async def _run(self): + self._stopped = asyncio.Event() + self._loop = asyncio.get_running_loop() + for sig in (signal.SIGTERM, signal.SIGHUP): + try: + self._loop.add_signal_handler(sig, self._stopped.set) + except NotImplementedError: + pass + await self.connect() + try: + await self._stopped.wait() + except (asyncio.CancelledError, KeyboardInterrupt): + pass + print(f'[{_ts()}] Shutting down...') + await self.disconnect() + + def stop(self): + if self._loop and not self._loop.is_closed(): + asyncio.run_coroutine_threadsafe(self._async_stop(), self._loop) + + async def _async_stop(self): + self._stopped.set()