import asyncio import json import random import signal import threading from datetime import datetime from slixmpp import ClientXMPP from services.session_manager import SessionManager import config from tools.roleplayer import should_respond from scripts.persona import PERSONALITY # Anti-ban: delay constants for MUC rejoin behavior MUC_REJOIN_INITIAL_DELAY = 5.0 # detik, delay awal sebelum rejoin MUC_REJOIN_BACKOFF_MULT = 2.0 # multiplier exponential backoff MUC_REJOIN_MAX_DELAY = 300.0 # detik, batas max backoff (5 menit) MUC_REJOIN_COOLDOWN = 10.0 # detik, cooldown minimum antar rejoin attempt MUC_NICK_SUFFIX_MAX = 3 # max coba nick alternatif (anti-ban: jangan terlalu banyak) def _ts(): return datetime.now().strftime('%H:%M:%S') def _typing_delay(text: str) -> float: """Hitung delay mengetik (detik) proporsional dengan panjang teks.""" char_count = len(text) if text else 0 delay = char_count / config.TYPING_SPEED return max(1.0, min(delay, config.TYPING_MAX)) async def _read_delay(): """Delay simulasi membaca pesan user.""" delay = random.uniform(config.READ_DELAY_MIN, config.READ_DELAY_MAX) await asyncio.sleep(delay) class XMPPClient(ClientXMPP): def __init__(self, jid, password, llm_client, tools_definition, TOOLS, TOOL_HANDLERS, build_system_prompt, agent_max_iterations, muc_rooms=None): super().__init__(jid, password) self._llm = llm_client self._tools_def = tools_definition self._TOOLS = TOOLS self._TOOL_HANDLERS = TOOL_HANDLERS self._build_system_prompt = build_system_prompt self._max_iterations = agent_max_iterations self._skill = config.AGENT_SKILL self._muc_rooms = muc_rooms or [] # Custom nick dari config, fallback ke username JID self._muc_nick = config.XMPP_NICKNAME.strip() or jid.split('@')[0] self._muc_nick_suffix = 0 # counter untuk nick alternatif saat 409 self._muc_ready: set[str] = set() self._session_mgr = SessionManager() self._loop = None self._stopped: asyncio.Event | None = None # Anti-ban: MUC rejoin tracking per room self._muc_rejoin_attempts: dict[str, int] = {} # room -> jumlah attempt self._muc_rejoin_tasks: dict[str, asyncio.Task] = {} # room -> pending rejoin task self._muc_last_join: dict[str, datetime] = {} # room -> terakhir join (cooldown) self.auto_reconnect = True self.register_plugin('xep_0030') self.register_plugin('xep_0045') self.register_plugin('xep_0199') self.add_event_handler('session_start', self._on_session_start) self.add_event_handler('message', self._on_message) self.add_event_handler('groupchat_message', self._on_groupchat_message) self.add_event_handler('disconnected', self._on_disconnected) self.add_event_handler('connected', self._on_connected) self.add_event_handler('groupchat_presence', self._on_muc_presence) def _get_muc_nick(self, room: str) -> str: """Anti-ban: resolve nick untuk room, coba nick alternatif kalau conflict.""" base = config.XMPP_NICKNAME.strip() or self._muc_nick suffix = self._muc_rejoin_attempts.get("_nick_" + room, 0) if suffix == 0: return base # Anti-ban: append suffix untuk menghindari 409 Conflict return f"{base}_{suffix}" def _calc_rejoin_delay(self, room: str) -> float: """Anti-ban: hitung delay rejoin dengan exponential backoff.""" attempts = self._muc_rejoin_attempts.get(room, 0) delay = MUC_REJOIN_INITIAL_DELAY * (MUC_REJOIN_BACKOFF_MULT ** attempts) return min(delay, MUC_REJOIN_MAX_DELAY) def _schedule_muc_rejoin(self, room: str): """Anti-ban: schedule rejoin room dengan backoff & cooldown.""" # Cancel pending rejoin task untuk room yang sama (anti-ban: avoid duplicate rejoin) pending = self._muc_rejoin_tasks.get(room) if pending and not pending.done(): pending.cancel() print(f'[{_ts()}] MUC [{room}] Cancelled pending rejoin (new trigger)') # Check cooldown: jangan rejoin terlalu cepat berturut-turut now = datetime.now() last_join = self._muc_last_join.get(room) if last_join: elapsed = (now - last_join).total_seconds() if elapsed < MUC_REJOIN_COOLDOWN: # Anti-ban: too soon, schedule delayed rejoin instead of immediate cooldown_left = MUC_REJOIN_COOLDOWN - elapsed print(f'[{_ts()}] MUC [{room}] Cooldown active ({cooldown_left:.0f}s left), delaying rejoin') delay = cooldown_left + self._calc_rejoin_delay(room) else: delay = self._calc_rejoin_delay(room) else: delay = self._calc_rejoin_delay(room) # Increment attempt counter (anti-ban: track for exponential backoff) attempts = self._muc_rejoin_attempts.get(room, 0) + 1 self._muc_rejoin_attempts[room] = attempts print(f'[{_ts()}] MUC [{room}] Rejoin scheduled in {delay:.0f}s (attempt #{attempts})') if self._loop and not self._loop.is_closed(): task = asyncio.run_coroutine_threadsafe( self._muc_rejoin_coro(room, delay), self._loop ) self._muc_rejoin_tasks[room] = task async def _muc_rejoin_coro(self, room: str, delay: float): """Anti-ban: coroutine untuk rejoin room setelah delay.""" try: await asyncio.sleep(delay) # Double-check: jangan rejoin kalau sudah di _muc_ready if room in self._muc_ready: print(f'[{_ts()}] MUC [{room}] Already ready, skip rejoin') return nick = self._get_muc_nick(room) print(f'[{_ts()}] MUC [{room}] Rejoining as {nick}...') await self.plugin['xep_0045'].join_muc_wait(room, nick, maxstanzas=0) self._muc_last_join[room] = datetime.now() # _muc_ready akan di-set oleh _on_muc_presence saat join berhasil self._muc_rejoin_attempts.pop(room, None) self._muc_rejoin_attempts.pop("_nick_" + room, None) print(f'[{_ts()}] MUC [{room}] Rejoin successful as {nick}') except asyncio.CancelledError: print(f'[{_ts()}] MUC [{room}] Rejoin cancelled') except Exception as e: print(f'[{_ts()}] MUC [{room}] Rejoin failed: {e}') # Anti-ban: handle 409 Conflict - nick sudah dipakai orang lain if '409' in str(e) or 'conflict' in str(e).lower(): nick_attempts = self._muc_rejoin_attempts.get("_nick_" + room, 0) if nick_attempts < MUC_NICK_SUFFIX_MAX: # Anti-ban: coba nick alternatif (lily_, lily__) self._muc_rejoin_attempts["_nick_" + room] = nick_attempts + 1 new_nick = self._get_muc_nick(room) print(f'[{_ts()}] MUC [{room}] Nick conflict, trying alternative: {new_nick}') # Retry segera dengan nick baru (tanpa backoff rejoin, tapi tetap ada delay biasa) self._schedule_muc_rejoin(room) else: # Anti-ban: semua nick alternativehabis, stop retry untuk avoid ban print(f'[{_ts()}] MUC [{room}] All nick variations exhausted, skipping room') print(f'[{_ts()}] MUC [{room}] Set XMPP_NICKNAME in .env to a unique nick') else: # Anti-ban: error biasa (network, dll), retry with backoff self._schedule_muc_rejoin(room) async def _on_connected(self, event): print(f'[{_ts()}] XMPP connected') async def _on_disconnected(self, event): print(f'[{_ts()}] XMPP disconnected') # Anti-ban: cancel all pending rejoin tasks on disconnect for room, task in list(self._muc_rejoin_tasks.items()): if not task.done(): task.cancel() print(f'[{_ts()}] MUC [{room}] Cancelled pending rejoin (disconnected)') self._muc_rejoin_tasks.clear() async def _on_session_start(self, event): self.send_presence() self.get_roster() print(f'[{_ts()}] XMPP online as {self.boundjid.full}') for room in self._muc_rooms: # Anti-ban: retry join dengan incremental delay & nick fallback success = False for attempt in range(1, 4): nick = self._get_muc_nick(room) try: await self.plugin['xep_0045'].join_muc_wait(room, nick, maxstanzas=0) print(f'[{_ts()}] Joined MUC room: {room} as {nick}') self._muc_last_join[room] = datetime.now() self._muc_rejoin_attempts.pop(room, None) self._muc_rejoin_attempts.pop("_nick_" + room, None) success = True break except Exception as e: print(f'[{_ts()}] MUC join attempt #{attempt} failed ({room}): {e}') # Anti-ban: handle 409 Conflict - coba nick alternatif if '409' in str(e) or 'conflict' in str(e).lower(): nick_attempts = self._muc_rejoin_attempts.get("_nick_" + room, 0) if nick_attempts < MUC_NICK_SUFFIX_MAX: nick_attempts += 1 self._muc_rejoin_attempts["_nick_" + room] = nick_attempts print(f'[{_ts()}] MUC [{room}] Nick conflict, switching to: {self._get_muc_nick(room)}') # Retry segera dengan nick baru (jangan wait) continue else: # Anti-ban: semua nick alternatif habis print(f'[{_ts()}] MUC [{room}] All nick variations exhausted') break elif attempt < 3: # Anti-ban: error biasa, wait before retry (2s, 4s) retry_delay = 2.0 * attempt print(f'[{_ts()}] MUC [{room}] Retrying in {retry_delay:.0f}s...') await asyncio.sleep(retry_delay) if not success: # Anti-ban: semua attempt gagal, schedule background rejoin print(f'[{_ts()}] MUC [{room}] All join attempts failed, scheduling background rejoin') self._schedule_muc_rejoin(room) def _on_message(self, msg): if msg['type'] not in ('chat', 'normal'): return jid = msg['from'].bare body = msg['body'].strip() if not body: return print(f'[{_ts()}] DM from {jid}: {body[:60]}') threading.Thread(target=self._process_dm, args=(jid, body), daemon=True).start() def _on_groupchat_message(self, msg): if msg['type'] != 'groupchat': return room = msg['from'].bare nick = msg['from'].resource if self._is_my_nick(room, nick): return room = msg['from'].bare if room not in self._muc_ready: return body = msg['body'].strip() if not body: return print(f'[{_ts()}] MUC [{room}] <{nick}>: {body[:60]}') threading.Thread(target=self._process_muc, args=(room, nick, body), daemon=True).start() def _is_my_nick(self, room: str, nick: str) -> bool: """Anti-ban: cek apakah nick yang dimasukan sesuai dengan nick bot di room.""" expected = self._get_muc_nick(room) # Bandingkan dengan nick yang diharapkan, plus base nick tanpa suffix base = config.XMPP_NICKNAME.strip() or self._muc_nick return nick == expected or nick == base def _on_muc_presence(self, presence): room = presence['from'].bare nick = presence['from'].resource ptype = presence['type'] if self._is_my_nick(room, nick) and ptype not in ('unavailable', 'error'): self._muc_ready.add(room) # Reset rejoin counter on successful join (anti-ban: avoid accumulating backoff) self._muc_rejoin_attempts.pop(room, None) self._muc_rejoin_attempts.pop("_nick_" + room, None) if ptype == 'unavailable': print(f'[{_ts()}] MUC [{room}] <{nick}> left') # Anti-ban: remove from ready set on unavailable to keep state consistent self._muc_ready.discard(room) # Anti-ban: trigger auto-rejoin with exponential backoff if self._is_my_nick(room, nick): self._schedule_muc_rejoin(room) elif ptype == 'error': print(f'[{_ts()}] MUC [{room}] error: {presence}') # Anti-ban: also rejoin on error (e.g. temporary failure) if self._is_my_nick(room, nick): self._muc_ready.discard(room) self._schedule_muc_rejoin(room) else: print(f'[{_ts()}] MUC [{room}] <{nick}> joined (type={ptype})') def _process_dm(self, jid, body): session = self._session_mgr.get_or_create( jid, self._build_system_prompt( tools_definition=self._tools_def, character=config.AGENT_CHARACTER or None, skills=config.AGENT_SKILLS.split(",") if config.AGENT_SKILLS else None, ) ) session.cancel_timer() self.send_presence_subscription(pto=jid, ptype='subscribed') if body == ':new': self._session_mgr.reset(jid) print(f'[{_ts()}] Session reset for {jid}') self._schedule_send(jid, 'Memulai sesi baru. Ada yang bisa di bantu?') return session.add_message('user', body) is_roleplay = self._skill == 'roleplayer' if not is_roleplay: self._schedule_send(jid, f'> {body}\nThinking...') # Delay 1: simulasi membaca pesan user if self._loop and not self._loop.is_closed(): asyncio.run_coroutine_threadsafe(_read_delay(), self._loop) self._agent_loop(session, jid, body, 'chat', sender_nickname=jid) # DM: timeout 24 jam (efektif tidak auto-close), MUC tetap 5 menit session.start_timer(86400, self._timeout_session, jid, 'chat') def _process_muc(self, room, nick, body): session = self._session_mgr.get_or_create( room, self._build_system_prompt( tools_definition=self._tools_def, character=config.AGENT_CHARACTER or None, skills=config.AGENT_SKILLS.split(",") if config.AGENT_SKILLS else None, ) ) session.cancel_timer() if body == ':new': self._session_mgr.reset(room) print(f'[{_ts()}] Session reset for MUC room {room}') self._schedule_send(room, 'Memulai sesi baru. Ada yang bisa di bantu?', mtype='groupchat') return prefixed = f'[{nick}] {body}' session.add_message('user', prefixed) if self._skill != 'roleplayer': self._schedule_send(room, f'> [{nick}] {body}\nThinking...', mtype='groupchat') # Delay 1: simulasi membaca pesan user if self._loop and not self._loop.is_closed(): asyncio.run_coroutine_threadsafe(_read_delay(), self._loop) self._agent_loop(session, room, f'[{nick}] {body}', 'groupchat', sender_nickname=nick) session.start_timer(300, self._timeout_session, room, 'groupchat') def _agent_loop(self, session, to, quote, mtype, sender_nickname=""): is_roleplayer = self._skill == 'roleplayer' my_name = PERSONALITY.name for step in range(self._max_iterations): print(f'[{_ts()}] Step {step + 1} — calling LLM...') response = self._llm.chat(session.messages, tools=self._TOOLS) if response.tool_calls: amsg = { 'role': 'assistant', 'content': response.content, 'tool_calls': response.tool_calls, } session.messages.append(amsg) tnames = [tc['function']['name'] for tc in response.tool_calls] print(f'[{_ts()}] Using tools: {", ".join(tnames)}') # Roleplayer tidak perlu kirim status "Using: ..." if not is_roleplayer: self._schedule_send(to, f'> {quote}\nUsing: {", ".join(tnames)}', mtype) for tc in response.tool_calls: result = self._execute_tool(tc) session.messages.append({ 'role': 'tool', 'tool_call_id': tc['id'], 'content': str(result), }) else: if response.content: print(f'[{_ts()}] Response generated ({len(response.content)} chars)') session.messages.append({'role': 'assistant', 'content': response.content}) # ── Roleplayer: cek need_response sebelum kirim ── if is_roleplayer: if config.XMPP_SELECTIVE_RESPONSE: # Build recent history dari session messages (tanpa system prompt) recent_msgs = [] for msg in session.messages[-6:]: if msg.get('role') == 'user': recent_msgs.append(f"User: {msg.get('content', '')}") elif msg.get('role') == 'assistant' and msg.get('content'): recent_msgs.append(f"{my_name}: {msg.get('content', '')}") recent_history = "\n".join(recent_msgs) original_message = quote if should_respond( message=original_message, sender_nickname=sender_nickname, recent_history=recent_history, my_name=my_name, ): print(f'[{_ts()}] need_response=True → sending response') self._schedule_send(to, response.content, mtype) else: print(f'[{_ts()}] need_response=False → staying silent') else: # Selective response OFF: cuma respon kalau nama AI disebut di pesan from tools.roleplayer import _name_mentioned if _name_mentioned(my_name, quote): print(f'[{_ts()}] Name mentioned → sending response') self._schedule_send(to, response.content, mtype) else: print(f'[{_ts()}] Name not mentioned → staying silent') else: self._schedule_send(to, f'> {quote}\n{response.content}', mtype) return print(f'[{_ts()}] Max iterations ({self._max_iterations}) reached') session.messages.append({ 'role': 'assistant', 'content': 'Max iterations reached without final answer.', }) if is_roleplayer: self._schedule_send(to, 'Max iterations reached without final answer.', mtype) else: self._schedule_send(to, f'> {quote}\nMax iterations reached without final answer.', mtype) def _execute_tool(self, tool_call): tname = tool_call['function']['name'] targs = json.loads(tool_call['function']['arguments']) handler = self._TOOL_HANDLERS.get(tname) if not handler: return f'Tool {tname} not found' try: if tname == 'search_code': return handler( pattern=targs['pattern'], search_type=targs['search_type'], path=targs.get('path', '.'), ) elif tname == 'git_operation': return handler(args=targs['args']) else: return handler(**targs) except Exception as e: return f'Error executing tool: {str(e)}' def _schedule_send(self, to, body, mtype='chat'): if self._loop and not self._loop.is_closed(): asyncio.run_coroutine_threadsafe( self._send_coro(to, body, mtype), self._loop ) else: print(f'[{_ts()}] WARNING: cannot send to {to} — loop unavailable') async def _send_coro(self, to, body, mtype): try: # Delay 2: simulasi mengetik (proporsional dengan panjang pesan) delay = _typing_delay(body) print(f'[{_ts()}] Typing delay: {delay:.1f}s ({len(body)} chars)') await asyncio.sleep(delay) msg = self.make_message(mto=to, mbody=body, mtype=mtype) msg.send() except Exception as e: print(f'[{_ts()}] SEND ERROR: {e}') def _timeout_session(self, session_id, mtype): print(f'[{_ts()}] Session timeout: {session_id}') self._schedule_send(session_id, 'Sesi ditutup. Sampai jumpa', mtype) self._session_mgr.reset(session_id) def start(self): print(f'[{_ts()}] Starting XMPP service...') asyncio.run(self._run()) async def _run(self): self._stopped = asyncio.Event() self._loop = asyncio.get_running_loop() # Hanya tangani SIGTERM untuk shutdown. # SENGATKAN SIGHUP: nohup kirim SIGHUP saat terminal close, # dan kita tidak mau proses mati karena itu. try: self._loop.add_signal_handler(signal.SIGTERM, self._stopped.set) except NotImplementedError: pass await self.connect() try: await self._stopped.wait() except (asyncio.CancelledError, KeyboardInterrupt): pass print(f'[{_ts()}] Shutting down...') await self.disconnect() def stop(self): if self._loop and not self._loop.is_closed(): asyncio.run_coroutine_threadsafe(self._async_stop(), self._loop) async def _async_stop(self): self._stopped.set()