- Add XMPP_NICKNAME config in .env for custom MUC nick (fallback to JID username) - Add _get_muc_nick() helper: resolve nick with suffix fallback on conflict - Add _is_my_nick() helper: compare presence nick with expected nick per room - Handle 409 Conflict in _on_session_start: try nick alternatives (lily_, lily__) - Handle 409 Conflict in _muc_rejoin_coro: try nick alternatives with max 3 attempts - Stop retry when all nick variations exhausted (anti-ban: avoid infinite retry) - Reset nick suffix counter on successful join - Update _on_groupchat_message filter to use _is_my_nick()
438 lines
19 KiB
Python
438 lines
19 KiB
Python
import asyncio
|
|
import json
|
|
import random
|
|
import signal
|
|
import threading
|
|
from datetime import datetime
|
|
|
|
from slixmpp import ClientXMPP
|
|
from services.session_manager import SessionManager
|
|
|
|
import config
|
|
|
|
# Anti-ban: delay constants for MUC rejoin behavior
|
|
MUC_REJOIN_INITIAL_DELAY = 5.0 # detik, delay awal sebelum rejoin
|
|
MUC_REJOIN_BACKOFF_MULT = 2.0 # multiplier exponential backoff
|
|
MUC_REJOIN_MAX_DELAY = 300.0 # detik, batas max backoff (5 menit)
|
|
MUC_REJOIN_COOLDOWN = 10.0 # detik, cooldown minimum antar rejoin attempt
|
|
MUC_NICK_SUFFIX_MAX = 3 # max coba nick alternatif (anti-ban: jangan terlalu banyak)
|
|
|
|
|
|
def _ts():
|
|
return datetime.now().strftime('%H:%M:%S')
|
|
|
|
|
|
def _typing_delay(text: str) -> float:
|
|
"""Hitung delay mengetik (detik) proporsional dengan panjang teks."""
|
|
char_count = len(text) if text else 0
|
|
delay = char_count / config.TYPING_SPEED
|
|
return max(1.0, min(delay, config.TYPING_MAX))
|
|
|
|
|
|
async def _read_delay():
|
|
"""Delay simulasi membaca pesan user."""
|
|
delay = random.uniform(config.READ_DELAY_MIN, config.READ_DELAY_MAX)
|
|
await asyncio.sleep(delay)
|
|
|
|
|
|
class XMPPClient(ClientXMPP):
|
|
def __init__(self, jid, password, llm_client, tools_definition, TOOLS,
|
|
TOOL_HANDLERS, build_system_prompt, agent_max_iterations,
|
|
muc_rooms=None):
|
|
super().__init__(jid, password)
|
|
|
|
self._llm = llm_client
|
|
self._tools_def = tools_definition
|
|
self._TOOLS = TOOLS
|
|
self._TOOL_HANDLERS = TOOL_HANDLERS
|
|
self._build_system_prompt = build_system_prompt
|
|
self._max_iterations = agent_max_iterations
|
|
self._muc_rooms = muc_rooms or []
|
|
# Custom nick dari config, fallback ke username JID
|
|
self._muc_nick = config.XMPP_NICKNAME.strip() or jid.split('@')[0]
|
|
self._muc_nick_suffix = 0 # counter untuk nick alternatif saat 409
|
|
self._muc_ready: set[str] = set()
|
|
|
|
self._session_mgr = SessionManager()
|
|
self._loop = None
|
|
self._stopped: asyncio.Event | None = None
|
|
|
|
# Anti-ban: MUC rejoin tracking per room
|
|
self._muc_rejoin_attempts: dict[str, int] = {} # room -> jumlah attempt
|
|
self._muc_rejoin_tasks: dict[str, asyncio.Task] = {} # room -> pending rejoin task
|
|
self._muc_last_join: dict[str, datetime] = {} # room -> terakhir join (cooldown)
|
|
|
|
self.auto_reconnect = True
|
|
|
|
self.register_plugin('xep_0030')
|
|
self.register_plugin('xep_0045')
|
|
self.register_plugin('xep_0199')
|
|
|
|
self.add_event_handler('session_start', self._on_session_start)
|
|
self.add_event_handler('message', self._on_message)
|
|
self.add_event_handler('groupchat_message', self._on_groupchat_message)
|
|
self.add_event_handler('disconnected', self._on_disconnected)
|
|
self.add_event_handler('connected', self._on_connected)
|
|
self.add_event_handler('groupchat_presence', self._on_muc_presence)
|
|
|
|
def _get_muc_nick(self, room: str) -> str:
|
|
"""Anti-ban: resolve nick untuk room, coba nick alternatif kalau conflict."""
|
|
base = config.XMPP_NICKNAME.strip() or self._muc_nick
|
|
suffix = self._muc_rejoin_attempts.get("_nick_" + room, 0)
|
|
if suffix == 0:
|
|
return base
|
|
# Anti-ban: append suffix untuk menghindari 409 Conflict
|
|
return f"{base}_{suffix}"
|
|
|
|
def _calc_rejoin_delay(self, room: str) -> float:
|
|
"""Anti-ban: hitung delay rejoin dengan exponential backoff."""
|
|
attempts = self._muc_rejoin_attempts.get(room, 0)
|
|
delay = MUC_REJOIN_INITIAL_DELAY * (MUC_REJOIN_BACKOFF_MULT ** attempts)
|
|
return min(delay, MUC_REJOIN_MAX_DELAY)
|
|
|
|
def _schedule_muc_rejoin(self, room: str):
|
|
"""Anti-ban: schedule rejoin room dengan backoff & cooldown."""
|
|
# Cancel pending rejoin task untuk room yang sama (anti-ban: avoid duplicate rejoin)
|
|
pending = self._muc_rejoin_tasks.get(room)
|
|
if pending and not pending.done():
|
|
pending.cancel()
|
|
print(f'[{_ts()}] MUC [{room}] Cancelled pending rejoin (new trigger)')
|
|
|
|
# Check cooldown: jangan rejoin terlalu cepat berturut-turut
|
|
now = datetime.now()
|
|
last_join = self._muc_last_join.get(room)
|
|
if last_join:
|
|
elapsed = (now - last_join).total_seconds()
|
|
if elapsed < MUC_REJOIN_COOLDOWN:
|
|
# Anti-ban: too soon, schedule delayed rejoin instead of immediate
|
|
cooldown_left = MUC_REJOIN_COOLDOWN - elapsed
|
|
print(f'[{_ts()}] MUC [{room}] Cooldown active ({cooldown_left:.0f}s left), delaying rejoin')
|
|
delay = cooldown_left + self._calc_rejoin_delay(room)
|
|
else:
|
|
delay = self._calc_rejoin_delay(room)
|
|
else:
|
|
delay = self._calc_rejoin_delay(room)
|
|
|
|
# Increment attempt counter (anti-ban: track for exponential backoff)
|
|
attempts = self._muc_rejoin_attempts.get(room, 0) + 1
|
|
self._muc_rejoin_attempts[room] = attempts
|
|
|
|
print(f'[{_ts()}] MUC [{room}] Rejoin scheduled in {delay:.0f}s (attempt #{attempts})')
|
|
|
|
if self._loop and not self._loop.is_closed():
|
|
task = asyncio.run_coroutine_threadsafe(
|
|
self._muc_rejoin_coro(room, delay), self._loop
|
|
)
|
|
self._muc_rejoin_tasks[room] = task
|
|
|
|
async def _muc_rejoin_coro(self, room: str, delay: float):
|
|
"""Anti-ban: coroutine untuk rejoin room setelah delay."""
|
|
try:
|
|
await asyncio.sleep(delay)
|
|
# Double-check: jangan rejoin kalau sudah di _muc_ready
|
|
if room in self._muc_ready:
|
|
print(f'[{_ts()}] MUC [{room}] Already ready, skip rejoin')
|
|
return
|
|
nick = self._get_muc_nick(room)
|
|
print(f'[{_ts()}] MUC [{room}] Rejoining as {nick}...')
|
|
await self.plugin['xep_0045'].join_muc_wait(room, nick, maxstanzas=0)
|
|
self._muc_last_join[room] = datetime.now()
|
|
# _muc_ready akan di-set oleh _on_muc_presence saat join berhasil
|
|
self._muc_rejoin_attempts.pop(room, None)
|
|
self._muc_rejoin_attempts.pop("_nick_" + room, None)
|
|
print(f'[{_ts()}] MUC [{room}] Rejoin successful as {nick}')
|
|
except asyncio.CancelledError:
|
|
print(f'[{_ts()}] MUC [{room}] Rejoin cancelled')
|
|
except Exception as e:
|
|
print(f'[{_ts()}] MUC [{room}] Rejoin failed: {e}')
|
|
# Anti-ban: handle 409 Conflict - nick sudah dipakai orang lain
|
|
if '409' in str(e) or 'conflict' in str(e).lower():
|
|
nick_attempts = self._muc_rejoin_attempts.get("_nick_" + room, 0)
|
|
if nick_attempts < MUC_NICK_SUFFIX_MAX:
|
|
# Anti-ban: coba nick alternatif (lily_, lily__)
|
|
self._muc_rejoin_attempts["_nick_" + room] = nick_attempts + 1
|
|
new_nick = self._get_muc_nick(room)
|
|
print(f'[{_ts()}] MUC [{room}] Nick conflict, trying alternative: {new_nick}')
|
|
# Retry segera dengan nick baru (tanpa backoff rejoin, tapi tetap ada delay biasa)
|
|
self._schedule_muc_rejoin(room)
|
|
else:
|
|
# Anti-ban: semua nick alternativehabis, stop retry untuk avoid ban
|
|
print(f'[{_ts()}] MUC [{room}] All nick variations exhausted, skipping room')
|
|
print(f'[{_ts()}] MUC [{room}] Set XMPP_NICKNAME in .env to a unique nick')
|
|
else:
|
|
# Anti-ban: error biasa (network, dll), retry with backoff
|
|
self._schedule_muc_rejoin(room)
|
|
|
|
async def _on_connected(self, event):
|
|
print(f'[{_ts()}] XMPP connected')
|
|
|
|
async def _on_disconnected(self, event):
|
|
print(f'[{_ts()}] XMPP disconnected')
|
|
# Anti-ban: cancel all pending rejoin tasks on disconnect
|
|
for room, task in list(self._muc_rejoin_tasks.items()):
|
|
if not task.done():
|
|
task.cancel()
|
|
print(f'[{_ts()}] MUC [{room}] Cancelled pending rejoin (disconnected)')
|
|
self._muc_rejoin_tasks.clear()
|
|
|
|
async def _on_session_start(self, event):
|
|
self.send_presence()
|
|
self.get_roster()
|
|
print(f'[{_ts()}] XMPP online as {self.boundjid.full}')
|
|
for room in self._muc_rooms:
|
|
# Anti-ban: retry join dengan incremental delay & nick fallback
|
|
success = False
|
|
for attempt in range(1, 4):
|
|
nick = self._get_muc_nick(room)
|
|
try:
|
|
await self.plugin['xep_0045'].join_muc_wait(room, nick, maxstanzas=0)
|
|
print(f'[{_ts()}] Joined MUC room: {room} as {nick}')
|
|
self._muc_last_join[room] = datetime.now()
|
|
self._muc_rejoin_attempts.pop(room, None)
|
|
self._muc_rejoin_attempts.pop("_nick_" + room, None)
|
|
success = True
|
|
break
|
|
except Exception as e:
|
|
print(f'[{_ts()}] MUC join attempt #{attempt} failed ({room}): {e}')
|
|
# Anti-ban: handle 409 Conflict - coba nick alternatif
|
|
if '409' in str(e) or 'conflict' in str(e).lower():
|
|
nick_attempts = self._muc_rejoin_attempts.get("_nick_" + room, 0)
|
|
if nick_attempts < MUC_NICK_SUFFIX_MAX:
|
|
nick_attempts += 1
|
|
self._muc_rejoin_attempts["_nick_" + room] = nick_attempts
|
|
print(f'[{_ts()}] MUC [{room}] Nick conflict, switching to: {self._get_muc_nick(room)}')
|
|
# Retry segera dengan nick baru (jangan wait)
|
|
continue
|
|
else:
|
|
# Anti-ban: semua nick alternatif habis
|
|
print(f'[{_ts()}] MUC [{room}] All nick variations exhausted')
|
|
break
|
|
elif attempt < 3:
|
|
# Anti-ban: error biasa, wait before retry (2s, 4s)
|
|
retry_delay = 2.0 * attempt
|
|
print(f'[{_ts()}] MUC [{room}] Retrying in {retry_delay:.0f}s...')
|
|
await asyncio.sleep(retry_delay)
|
|
if not success:
|
|
# Anti-ban: semua attempt gagal, schedule background rejoin
|
|
print(f'[{_ts()}] MUC [{room}] All join attempts failed, scheduling background rejoin')
|
|
self._schedule_muc_rejoin(room)
|
|
|
|
def _on_message(self, msg):
|
|
if msg['type'] not in ('chat', 'normal'):
|
|
return
|
|
jid = msg['from'].bare
|
|
body = msg['body'].strip()
|
|
if not body:
|
|
return
|
|
print(f'[{_ts()}] DM from {jid}: {body[:60]}')
|
|
threading.Thread(target=self._process_dm, args=(jid, body), daemon=True).start()
|
|
|
|
def _on_groupchat_message(self, msg):
|
|
if msg['type'] != 'groupchat':
|
|
return
|
|
room = msg['from'].bare
|
|
nick = msg['from'].resource
|
|
if self._is_my_nick(room, nick):
|
|
return
|
|
room = msg['from'].bare
|
|
if room not in self._muc_ready:
|
|
return
|
|
body = msg['body'].strip()
|
|
if not body:
|
|
return
|
|
print(f'[{_ts()}] MUC [{room}] <{nick}>: {body[:60]}')
|
|
threading.Thread(target=self._process_muc, args=(room, nick, body), daemon=True).start()
|
|
|
|
def _is_my_nick(self, room: str, nick: str) -> bool:
|
|
"""Anti-ban: cek apakah nick yang dimasukan sesuai dengan nick bot di room."""
|
|
expected = self._get_muc_nick(room)
|
|
# Bandingkan dengan nick yang diharapkan, plus base nick tanpa suffix
|
|
base = config.XMPP_NICKNAME.strip() or self._muc_nick
|
|
return nick == expected or nick == base
|
|
|
|
def _on_muc_presence(self, presence):
|
|
room = presence['from'].bare
|
|
nick = presence['from'].resource
|
|
ptype = presence['type']
|
|
if self._is_my_nick(room, nick) and ptype not in ('unavailable', 'error'):
|
|
self._muc_ready.add(room)
|
|
# Reset rejoin counter on successful join (anti-ban: avoid accumulating backoff)
|
|
self._muc_rejoin_attempts.pop(room, None)
|
|
self._muc_rejoin_attempts.pop("_nick_" + room, None)
|
|
if ptype == 'unavailable':
|
|
print(f'[{_ts()}] MUC [{room}] <{nick}> left')
|
|
# Anti-ban: remove from ready set on unavailable to keep state consistent
|
|
self._muc_ready.discard(room)
|
|
# Anti-ban: trigger auto-rejoin with exponential backoff
|
|
if self._is_my_nick(room, nick):
|
|
self._schedule_muc_rejoin(room)
|
|
elif ptype == 'error':
|
|
print(f'[{_ts()}] MUC [{room}] error: {presence}')
|
|
# Anti-ban: also rejoin on error (e.g. temporary failure)
|
|
if self._is_my_nick(room, nick):
|
|
self._muc_ready.discard(room)
|
|
self._schedule_muc_rejoin(room)
|
|
else:
|
|
print(f'[{_ts()}] MUC [{room}] <{nick}> joined (type={ptype})')
|
|
|
|
def _process_dm(self, jid, body):
|
|
session = self._session_mgr.get_or_create(
|
|
jid, self._build_system_prompt(self._tools_def)
|
|
)
|
|
session.cancel_timer()
|
|
|
|
self.send_presence_subscription(pto=jid, ptype='subscribed')
|
|
|
|
if body == ':new':
|
|
self._session_mgr.reset(jid)
|
|
print(f'[{_ts()}] Session reset for {jid}')
|
|
self._schedule_send(jid, 'Memulai sesi baru. Ada yang bisa di bantu?')
|
|
return
|
|
|
|
session.add_message('user', body)
|
|
|
|
self._schedule_send(jid, f'> {body}\nThinking...')
|
|
|
|
# Delay 1: simulasi membaca pesan user
|
|
if self._loop and not self._loop.is_closed():
|
|
asyncio.run_coroutine_threadsafe(_read_delay(), self._loop)
|
|
|
|
self._agent_loop(session, jid, body, 'chat')
|
|
|
|
session.start_timer(300, self._timeout_session, jid, 'chat')
|
|
|
|
def _process_muc(self, room, nick, body):
|
|
session = self._session_mgr.get_or_create(
|
|
room, self._build_system_prompt(self._tools_def)
|
|
)
|
|
session.cancel_timer()
|
|
|
|
if body == ':new':
|
|
self._session_mgr.reset(room)
|
|
print(f'[{_ts()}] Session reset for MUC room {room}')
|
|
self._schedule_send(room, 'Memulai sesi baru. Ada yang bisa di bantu?', mtype='groupchat')
|
|
return
|
|
|
|
prefixed = f'[{nick}] {body}'
|
|
session.add_message('user', prefixed)
|
|
|
|
self._schedule_send(room, f'> [{nick}] {body}\nThinking...', mtype='groupchat')
|
|
|
|
# Delay 1: simulasi membaca pesan user
|
|
if self._loop and not self._loop.is_closed():
|
|
asyncio.run_coroutine_threadsafe(_read_delay(), self._loop)
|
|
|
|
self._agent_loop(session, room, f'[{nick}] {body}', 'groupchat')
|
|
|
|
session.start_timer(300, self._timeout_session, room, 'groupchat')
|
|
|
|
def _agent_loop(self, session, to, quote, mtype):
|
|
for step in range(self._max_iterations):
|
|
print(f'[{_ts()}] Step {step + 1} — calling LLM...')
|
|
response = self._llm.chat(session.messages, tools=self._TOOLS)
|
|
|
|
if response.tool_calls:
|
|
amsg = {
|
|
'role': 'assistant',
|
|
'content': response.content,
|
|
'tool_calls': response.tool_calls,
|
|
}
|
|
session.messages.append(amsg)
|
|
|
|
tnames = [tc['function']['name'] for tc in response.tool_calls]
|
|
print(f'[{_ts()}] Using tools: {", ".join(tnames)}')
|
|
self._schedule_send(to, f'> {quote}\nUsing: {", ".join(tnames)}', mtype)
|
|
|
|
for tc in response.tool_calls:
|
|
result = self._execute_tool(tc)
|
|
session.messages.append({
|
|
'role': 'tool',
|
|
'tool_call_id': tc['id'],
|
|
'content': str(result),
|
|
})
|
|
else:
|
|
if response.content:
|
|
print(f'[{_ts()}] Response sent ({len(response.content)} chars)')
|
|
session.messages.append({'role': 'assistant', 'content': response.content})
|
|
self._schedule_send(to, f'> {quote}\n{response.content}', mtype)
|
|
return
|
|
|
|
print(f'[{_ts()}] Max iterations ({self._max_iterations}) reached')
|
|
session.messages.append({
|
|
'role': 'assistant',
|
|
'content': 'Max iterations reached without final answer.',
|
|
})
|
|
self._schedule_send(to, f'> {quote}\nMax iterations reached without final answer.', mtype)
|
|
|
|
def _execute_tool(self, tool_call):
|
|
tname = tool_call['function']['name']
|
|
targs = json.loads(tool_call['function']['arguments'])
|
|
handler = self._TOOL_HANDLERS.get(tname)
|
|
if not handler:
|
|
return f'Tool {tname} not found'
|
|
try:
|
|
if tname == 'search_code':
|
|
return handler(
|
|
pattern=targs['pattern'],
|
|
search_type=targs['search_type'],
|
|
path=targs.get('path', '.'),
|
|
)
|
|
elif tname == 'git_operation':
|
|
return handler(args=targs['args'])
|
|
else:
|
|
return handler(**targs)
|
|
except Exception as e:
|
|
return f'Error executing tool: {str(e)}'
|
|
|
|
def _schedule_send(self, to, body, mtype='chat'):
|
|
if self._loop and not self._loop.is_closed():
|
|
asyncio.run_coroutine_threadsafe(
|
|
self._send_coro(to, body, mtype), self._loop
|
|
)
|
|
else:
|
|
print(f'[{_ts()}] WARNING: cannot send to {to} — loop unavailable')
|
|
|
|
async def _send_coro(self, to, body, mtype):
|
|
try:
|
|
# Delay 2: simulasi mengetik (proporsional dengan panjang pesan)
|
|
delay = _typing_delay(body)
|
|
print(f'[{_ts()}] Typing delay: {delay:.1f}s ({len(body)} chars)')
|
|
await asyncio.sleep(delay)
|
|
|
|
msg = self.make_message(mto=to, mbody=body, mtype=mtype)
|
|
msg.send()
|
|
except Exception as e:
|
|
print(f'[{_ts()}] SEND ERROR: {e}')
|
|
|
|
def _timeout_session(self, session_id, mtype):
|
|
print(f'[{_ts()}] Session timeout: {session_id}')
|
|
self._schedule_send(session_id, 'Sesi ditutup. Sampai jumpa', mtype)
|
|
self._session_mgr.reset(session_id)
|
|
|
|
def start(self):
|
|
print(f'[{_ts()}] Starting XMPP service...')
|
|
asyncio.run(self._run())
|
|
|
|
async def _run(self):
|
|
self._stopped = asyncio.Event()
|
|
self._loop = asyncio.get_running_loop()
|
|
for sig in (signal.SIGTERM, signal.SIGHUP):
|
|
try:
|
|
self._loop.add_signal_handler(sig, self._stopped.set)
|
|
except NotImplementedError:
|
|
pass
|
|
await self.connect()
|
|
try:
|
|
await self._stopped.wait()
|
|
except (asyncio.CancelledError, KeyboardInterrupt):
|
|
pass
|
|
print(f'[{_ts()}] Shutting down...')
|
|
await self.disconnect()
|
|
|
|
def stop(self):
|
|
if self._loop and not self._loop.is_closed():
|
|
asyncio.run_coroutine_threadsafe(self._async_stop(), self._loop)
|
|
|
|
async def _async_stop(self):
|
|
self._stopped.set()
|