hendrik/services/xmpp_client.py
2026-06-08 02:41:29 +07:00

248 lines
9.1 KiB
Python

import asyncio
import json
import signal
import threading
from datetime import datetime
from slixmpp import ClientXMPP
from services.session_manager import SessionManager
def _ts():
return datetime.now().strftime('%H:%M:%S')
class XMPPClient(ClientXMPP):
def __init__(self, jid, password, llm_client, tools_definition, TOOLS,
TOOL_HANDLERS, build_system_prompt, agent_max_iterations,
muc_rooms=None):
super().__init__(jid, password)
self._llm = llm_client
self._tools_def = tools_definition
self._TOOLS = TOOLS
self._TOOL_HANDLERS = TOOL_HANDLERS
self._build_system_prompt = build_system_prompt
self._max_iterations = agent_max_iterations
self._muc_rooms = muc_rooms or []
self._muc_nick = jid.split('@')[0]
self._muc_ready: set[str] = set()
self._session_mgr = SessionManager()
self._loop = None
self._stopped: asyncio.Event | None = None
self.auto_reconnect = True
self.register_plugin('xep_0030')
self.register_plugin('xep_0045')
self.register_plugin('xep_0199')
self.add_event_handler('session_start', self._on_session_start)
self.add_event_handler('message', self._on_message)
self.add_event_handler('groupchat_message', self._on_groupchat_message)
self.add_event_handler('disconnected', self._on_disconnected)
self.add_event_handler('connected', self._on_connected)
self.add_event_handler('groupchat_presence', self._on_muc_presence)
async def _on_connected(self, event):
print(f'[{_ts()}] XMPP connected')
async def _on_disconnected(self, event):
print(f'[{_ts()}] XMPP disconnected')
async def _on_session_start(self, event):
self.send_presence()
self.get_roster()
print(f'[{_ts()}] XMPP online as {self.boundjid.full}')
for room in self._muc_rooms:
try:
await self.plugin['xep_0045'].join_muc_wait(room, self._muc_nick, maxstanzas=0)
print(f'[{_ts()}] Joined MUC room: {room}')
except Exception as e:
print(f'[{_ts()}] MUC join failed ({room}): {e}')
def _on_message(self, msg):
if msg['type'] not in ('chat', 'normal'):
return
jid = msg['from'].bare
body = msg['body'].strip()
if not body:
return
print(f'[{_ts()}] DM from {jid}: {body[:60]}')
threading.Thread(target=self._process_dm, args=(jid, body), daemon=True).start()
def _on_groupchat_message(self, msg):
if msg['type'] != 'groupchat':
return
nick = msg['from'].resource
if nick == self._muc_nick:
return
room = msg['from'].bare
if room not in self._muc_ready:
return
body = msg['body'].strip()
if not body:
return
print(f'[{_ts()}] MUC [{room}] <{nick}>: {body[:60]}')
threading.Thread(target=self._process_muc, args=(room, nick, body), daemon=True).start()
def _on_muc_presence(self, presence):
room = presence['from'].bare
nick = presence['from'].resource
ptype = presence['type']
if nick == self._muc_nick and ptype not in ('unavailable', 'error'):
self._muc_ready.add(room)
if ptype == 'unavailable':
print(f'[{_ts()}] MUC [{room}] <{nick}> left')
elif ptype == 'error':
print(f'[{_ts()}] MUC [{room}] error: {presence}')
else:
print(f'[{_ts()}] MUC [{room}] <{nick}> joined (type={ptype})')
def _process_dm(self, jid, body):
session = self._session_mgr.get_or_create(
jid, self._build_system_prompt(self._tools_def)
)
session.cancel_timer()
self.send_presence_subscription(pto=jid, ptype='subscribed')
if body == ':new':
self._session_mgr.reset(jid)
print(f'[{_ts()}] Session reset for {jid}')
self._schedule_send(jid, 'Memulai sesi baru. Ada yang bisa di bantu?')
return
session.add_message('user', body)
self._schedule_send(jid, f'> {body}\nThinking...')
self._agent_loop(session, jid, body, 'chat')
session.start_timer(300, self._timeout_session, jid, 'chat')
def _process_muc(self, room, nick, body):
session = self._session_mgr.get_or_create(
room, self._build_system_prompt(self._tools_def)
)
session.cancel_timer()
if body == ':new':
self._session_mgr.reset(room)
print(f'[{_ts()}] Session reset for MUC room {room}')
self._schedule_send(room, 'Memulai sesi baru. Ada yang bisa di bantu?', mtype='groupchat')
return
prefixed = f'[{nick}] {body}'
session.add_message('user', prefixed)
self._schedule_send(room, f'> [{nick}] {body}\nThinking...', mtype='groupchat')
self._agent_loop(session, room, f'[{nick}] {body}', 'groupchat')
session.start_timer(300, self._timeout_session, room, 'groupchat')
def _agent_loop(self, session, to, quote, mtype):
for step in range(self._max_iterations):
print(f'[{_ts()}] Step {step + 1} — calling LLM...')
response = self._llm.chat(session.messages, tools=self._TOOLS)
if response.tool_calls:
amsg = {
'role': 'assistant',
'content': response.content,
'tool_calls': response.tool_calls,
}
session.messages.append(amsg)
tnames = [tc['function']['name'] for tc in response.tool_calls]
print(f'[{_ts()}] Using tools: {", ".join(tnames)}')
self._schedule_send(to, f'> {quote}\nUsing: {", ".join(tnames)}', mtype)
for tc in response.tool_calls:
result = self._execute_tool(tc)
session.messages.append({
'role': 'tool',
'tool_call_id': tc['id'],
'content': str(result),
})
else:
if response.content:
print(f'[{_ts()}] Response sent ({len(response.content)} chars)')
session.messages.append({'role': 'assistant', 'content': response.content})
self._schedule_send(to, f'> {quote}\n{response.content}', mtype)
return
print(f'[{_ts()}] Max iterations ({self._max_iterations}) reached')
session.messages.append({
'role': 'assistant',
'content': 'Max iterations reached without final answer.',
})
self._schedule_send(to, f'> {quote}\nMax iterations reached without final answer.', mtype)
def _execute_tool(self, tool_call):
tname = tool_call['function']['name']
targs = json.loads(tool_call['function']['arguments'])
handler = self._TOOL_HANDLERS.get(tname)
if not handler:
return f'Tool {tname} not found'
try:
if tname == 'search_code':
return handler(
pattern=targs['pattern'],
search_type=targs['search_type'],
path=targs.get('path', '.'),
)
elif tname == 'git_operation':
return handler(args=targs['args'])
else:
return handler(**targs)
except Exception as e:
return f'Error executing tool: {str(e)}'
def _schedule_send(self, to, body, mtype='chat'):
if self._loop and not self._loop.is_closed():
asyncio.run_coroutine_threadsafe(
self._send_coro(to, body, mtype), self._loop
)
else:
print(f'[{_ts()}] WARNING: cannot send to {to} — loop unavailable')
async def _send_coro(self, to, body, mtype):
try:
msg = self.make_message(mto=to, mbody=body, mtype=mtype)
msg.send()
except Exception as e:
print(f'[{_ts()}] SEND ERROR: {e}')
def _timeout_session(self, session_id, mtype):
print(f'[{_ts()}] Session timeout: {session_id}')
self._schedule_send(session_id, 'Sesi ditutup. Sampai jumpa', mtype)
self._session_mgr.reset(session_id)
def start(self):
print(f'[{_ts()}] Starting XMPP service...')
asyncio.run(self._run())
async def _run(self):
self._stopped = asyncio.Event()
self._loop = asyncio.get_running_loop()
for sig in (signal.SIGTERM, signal.SIGHUP):
try:
self._loop.add_signal_handler(sig, self._stopped.set)
except NotImplementedError:
pass
await self.connect()
try:
await self._stopped.wait()
except (asyncio.CancelledError, KeyboardInterrupt):
pass
print(f'[{_ts()}] Shutting down...')
await self.disconnect()
def stop(self):
if self._loop and not self._loop.is_closed():
asyncio.run_coroutine_threadsafe(self._async_stop(), self._loop)
async def _async_stop(self):
self._stopped.set()