Source code for acord.core.heartbeat

# Basic heartbeat controller
from abc import ABC, abstractmethod
from threading import Thread
import asyncio
import time
from .signals import gateway  # type: ignore
import logging

logger = logging.getLogger(__name__)


[docs]class KeepAlive(Thread, ABC): """Represents a keep alive handler. .. DANGER:: If you are not overwriting :meth:`KeepAlive.run`, you will need to provide the following attrs. Attributes ---------- _ended: :class:`bool` Whether heartbeating has ended _interval: :class:`int` Time to wait between heartbeats, in seconds. """ _ended: bool _interval: int
[docs] def run(self): """ Default .run function, calls :meth:`KeepAlive.send_heartbeat` every n seconds. .. note:: Once ended, ``.join`` is called within this function """ while not self._ended: self.send_heartbeat() time.sleep(self._interval) self.join()
[docs] @abstractmethod def send_heartbeat(self): """ Sends a heartbeat """
[docs] @abstractmethod def get_payload(self): """ Gets heartbeat payload """
[docs] @abstractmethod def ack(self): """ Called when server responds with an ACK to our heartbeat """
class GatewayKeepAlive(KeepAlive): def __init__(self, shard, interval, loop=asyncio.get_event_loop()): self.sent_at = None self.latency = float("inf") self.shard = shard self._loop: asyncio.AbstractEventLoop = loop self._ws = shard.ws self._interval = interval / 1000 self._ended = False self._waiting_for_ack = False super().__init__(daemon=True) def run(self): while not self._ended: self.send_heartbeat() time.sleep(self._interval) self.join() def send_heartbeat(self): coro = self._ws.send_json(self.get_payload()) asyncio.run_coroutine_threadsafe(coro, self._loop) self.sent_at = time.perf_counter() self._waiting_for_ack = True logger.info(f"Sent heartbeat for shard {self.shard.shard_id}, waiting {self._interval} seconds...") def get_payload(self): return {"op": gateway.HEARTBEAT, "d": self.shard.sequence} def ack(self): d = time.perf_counter() self._waiting_for_ack = False self.latency = d - self.sent_at class VoiceKeepAlive(KeepAlive): def __init__(self, connection, packet, loop=asyncio.get_event_loop()) -> None: self.integer_nonce = 0 self.sent_at = None self.latency = float("inf") self.connection = connection self._loop = loop self._interval = packet["d"]["heartbeat_interval"] / 1000 self._ended = False self._ws = connection._ws def send_heartbeat(self): coro = self._ws.send_json(self.get_payload()) asyncio.run_coroutine_threadsafe(coro, self._loop) self.sent_at = time.perf_counter() self._waiting_for_ack = True logger.info(f"Sent Heartbeat to voice channel, conn_id={self.connection}") def ack(self): d = time.perf_counter() self._waiting_for_ack = False self.latency = d - self.sent_at def get_payload(self): self.integer_nonce += 1 return {"op": 3, "d": self.integer_nonce} def end(self): self._ended = True