import asyncio import json import signal import threading from datetime import datetime from slixmpp import ClientXMPP from services.session_manager import SessionManager def _ts(): return datetime.now().strftime('%H:%M:%S') class XMPPClient(ClientXMPP): def __init__(self, jid, password, llm_client, tools_definition, TOOLS, TOOL_HANDLERS, build_system_prompt, agent_max_iterations, muc_rooms=None): super().__init__(jid, password) self._llm = llm_client self._tools_def = tools_definition self._TOOLS = TOOLS self._TOOL_HANDLERS = TOOL_HANDLERS self._build_system_prompt = build_system_prompt self._max_iterations = agent_max_iterations self._muc_rooms = muc_rooms or [] self._muc_nick = jid.split('@')[0] self._muc_ready: set[str] = set() self._session_mgr = SessionManager() self._loop = None self._stopped: asyncio.Event | None = None self.auto_reconnect = True self.register_plugin('xep_0030') self.register_plugin('xep_0045') self.register_plugin('xep_0199') self.add_event_handler('session_start', self._on_session_start) self.add_event_handler('message', self._on_message) self.add_event_handler('groupchat_message', self._on_groupchat_message) self.add_event_handler('disconnected', self._on_disconnected) self.add_event_handler('connected', self._on_connected) self.add_event_handler('groupchat_presence', self._on_muc_presence) async def _on_connected(self, event): print(f'[{_ts()}] XMPP connected') async def _on_disconnected(self, event): print(f'[{_ts()}] XMPP disconnected') async def _on_session_start(self, event): self.send_presence() self.get_roster() print(f'[{_ts()}] XMPP online as {self.boundjid.full}') for room in self._muc_rooms: try: await self.plugin['xep_0045'].join_muc_wait(room, self._muc_nick, maxstanzas=0) print(f'[{_ts()}] Joined MUC room: {room}') except Exception as e: print(f'[{_ts()}] MUC join failed ({room}): {e}') def _on_message(self, msg): if msg['type'] not in ('chat', 'normal'): return jid = msg['from'].bare body = msg['body'].strip() if not body: return print(f'[{_ts()}] DM from {jid}: {body[:60]}') threading.Thread(target=self._process_dm, args=(jid, body), daemon=True).start() def _on_groupchat_message(self, msg): if msg['type'] != 'groupchat': return nick = msg['from'].resource if nick == self._muc_nick: return room = msg['from'].bare if room not in self._muc_ready: return body = msg['body'].strip() if not body: return print(f'[{_ts()}] MUC [{room}] <{nick}>: {body[:60]}') threading.Thread(target=self._process_muc, args=(room, nick, body), daemon=True).start() def _on_muc_presence(self, presence): room = presence['from'].bare nick = presence['from'].resource ptype = presence['type'] if nick == self._muc_nick and ptype not in ('unavailable', 'error'): self._muc_ready.add(room) if ptype == 'unavailable': print(f'[{_ts()}] MUC [{room}] <{nick}> left') elif ptype == 'error': print(f'[{_ts()}] MUC [{room}] error: {presence}') else: print(f'[{_ts()}] MUC [{room}] <{nick}> joined (type={ptype})') def _process_dm(self, jid, body): session = self._session_mgr.get_or_create( jid, self._build_system_prompt(self._tools_def) ) session.cancel_timer() self.send_presence_subscription(pto=jid, ptype='subscribed') if body == ':new': self._session_mgr.reset(jid) print(f'[{_ts()}] Session reset for {jid}') self._schedule_send(jid, 'Memulai sesi baru. Ada yang bisa di bantu?') return session.add_message('user', body) self._schedule_send(jid, f'> {body}\nThinking...') self._agent_loop(session, jid, body, 'chat') session.start_timer(300, self._timeout_session, jid, 'chat') def _process_muc(self, room, nick, body): session = self._session_mgr.get_or_create( room, self._build_system_prompt(self._tools_def) ) session.cancel_timer() if body == ':new': self._session_mgr.reset(room) print(f'[{_ts()}] Session reset for MUC room {room}') self._schedule_send(room, 'Memulai sesi baru. Ada yang bisa di bantu?', mtype='groupchat') return prefixed = f'[{nick}] {body}' session.add_message('user', prefixed) self._schedule_send(room, f'> [{nick}] {body}\nThinking...', mtype='groupchat') self._agent_loop(session, room, f'[{nick}] {body}', 'groupchat') session.start_timer(300, self._timeout_session, room, 'groupchat') def _agent_loop(self, session, to, quote, mtype): for step in range(self._max_iterations): print(f'[{_ts()}] Step {step + 1} — calling LLM...') response = self._llm.chat(session.messages, tools=self._TOOLS) if response.tool_calls: amsg = { 'role': 'assistant', 'content': response.content, 'tool_calls': response.tool_calls, } session.messages.append(amsg) tnames = [tc['function']['name'] for tc in response.tool_calls] print(f'[{_ts()}] Using tools: {", ".join(tnames)}') self._schedule_send(to, f'> {quote}\nUsing: {", ".join(tnames)}', mtype) for tc in response.tool_calls: result = self._execute_tool(tc) session.messages.append({ 'role': 'tool', 'tool_call_id': tc['id'], 'content': str(result), }) else: if response.content: print(f'[{_ts()}] Response sent ({len(response.content)} chars)') session.messages.append({'role': 'assistant', 'content': response.content}) self._schedule_send(to, f'> {quote}\n{response.content}', mtype) return print(f'[{_ts()}] Max iterations ({self._max_iterations}) reached') session.messages.append({ 'role': 'assistant', 'content': 'Max iterations reached without final answer.', }) self._schedule_send(to, f'> {quote}\nMax iterations reached without final answer.', mtype) def _execute_tool(self, tool_call): tname = tool_call['function']['name'] targs = json.loads(tool_call['function']['arguments']) handler = self._TOOL_HANDLERS.get(tname) if not handler: return f'Tool {tname} not found' try: if tname == 'search_code': return handler( pattern=targs['pattern'], search_type=targs['search_type'], path=targs.get('path', '.'), ) elif tname == 'git_operation': return handler(args=targs['args']) else: return handler(**targs) except Exception as e: return f'Error executing tool: {str(e)}' def _schedule_send(self, to, body, mtype='chat'): if self._loop and not self._loop.is_closed(): asyncio.run_coroutine_threadsafe( self._send_coro(to, body, mtype), self._loop ) else: print(f'[{_ts()}] WARNING: cannot send to {to} — loop unavailable') async def _send_coro(self, to, body, mtype): try: msg = self.make_message(mto=to, mbody=body, mtype=mtype) msg.send() except Exception as e: print(f'[{_ts()}] SEND ERROR: {e}') def _timeout_session(self, session_id, mtype): print(f'[{_ts()}] Session timeout: {session_id}') self._schedule_send(session_id, 'Sesi ditutup. Sampai jumpa', mtype) self._session_mgr.reset(session_id) def start(self): print(f'[{_ts()}] Starting XMPP service...') asyncio.run(self._run()) async def _run(self): self._stopped = asyncio.Event() self._loop = asyncio.get_running_loop() for sig in (signal.SIGTERM, signal.SIGHUP): try: self._loop.add_signal_handler(sig, self._stopped.set) except NotImplementedError: pass await self.connect() try: await self._stopped.wait() except (asyncio.CancelledError, KeyboardInterrupt): pass print(f'[{_ts()}] Shutting down...') await self.disconnect() def stop(self): if self._loop and not self._loop.is_closed(): asyncio.run_coroutine_threadsafe(self._async_stop(), self._loop) async def _async_stop(self): self._stopped.set()