Source code for acord.client.shard

# Represents a shard
# Client will normally run off a single shard
from __future__ import annotations
import asyncio
import sys
from typing import Any, Callable, Coroutine
import logging


from acord.errors import GatewayError
from acord.models import Snowflake

from acord.core.signals import gateway
from acord.core.decoders import decodeResponse
from acord.core.heartbeat import GatewayKeepAlive

from acord.payloads import (
    GenericWebsocketPayload,
    VoiceStateUpdatePresence,
)
from acord.bases import Presence

from .handler import handle_websocket
from .ratelimiter import GatewayRatelimiter

logger = logging.getLogger(__name__)


IDENTITY_PCK = {
    "properties": {
        "$os": sys.platform,
        "$browser": "acord",
        "$device": "acord",
        "$referrer": None,
        "$referring_domain": None,
    },
    "compress": True,
    "large_threshold": 250,
}


[docs]class Shard: """Representation of a discord shard, which is basically a connection to the gateway. .. warning:: Shards are enabled by default but only Shard 0 will receive dms .. note:: When providing a handler, it must take care of reading the websocket, else see the implementation below. .. rubric:: Working with a shard .. code-block:: py shard = Shard("url", shard_id, num_shards, client) # Connect await shard.connect() await shard.receive_hello() await shard.send_identity() await shard.wait_until_ready() # Deal with messages async for message in shard.ws: ... Parameters ---------- url: :class:`str` Gateway URL shard_id: :class:`int` ID of shard num_shards: :class:`int` Total number of shards client uses handler: Callable[..., Coroutine[Any, Any, Any]] A handler to overwrite the default handler client: :class:`Client` Client this shard is attached to. Attributes ---------- url: :class:`str` Gateway url shard_id: :class:`int` ID of shard num_shards: :class:`int` Total number of shards client has client: :class:`Client` Client shard is attached to session: :class:`~aiohttp.ClientSession` Session being used to make requests handler: Callable[..., Coroutine[Any, Any, Any]] Handler to be used when :meth:`Shard.listen` is called ws: :class:`~aiohttp.ClientWebSocketResponse` WS connected to :attr:`Shard.url`, only available after :meth:`Shard.connect` is called ready_event: :obj:`py:asyncio.Event` An event object which is set after shard receives READY loop: :obj:`py:asyncio.AbstractEventLoop` Loop that the shard is using sequence: :class:`int` Shard sequence, value should be changed by user else unexpected errors may occur session_id: :class:`str` Session ID, used for resuming. Dont change it. gateway_version: :class:`str` Gateway version client is using resuming: :class:`bool` Whether the shard is in a resuming state ratelimit_key: :class:`int` Ratelimit key used for bucket ratelimiting gateway requests """ def __init__(self, url: str, shard_id: int, num_shards: int, client: Any, handler: Callable[..., Coroutine[Any, Any, Any]] = handle_websocket, ): self.url = url self.shard_id = shard_id self.num_shards = num_shards self.client = client self.session = client.http._session self.handler = handler self.ws = None self.ready_event = asyncio.Event() self.loop: asyncio.AbstractEventLoop = asyncio.get_event_loop() self.ratelimiter: GatewayRatelimiter = self.client.gateway_ratelimiter self.ratelimiter.add_shard(self, overwrite=True) self.sequence = None self.session_id = None self.gateway_version = None self.resuming = False def contains_guild(self, guild_id: Snowflake, /) -> bool: return ((guild_id >> 22) % self.num_shards) == self.shard_id
[docs] async def wait_until_ready(self): """|coro| Blocks until the shard is ready """ await self._ready_event.wait()
[docs] async def connect(self, **kwds) -> None: """|coro| Connects to gateway Parameters ---------- token: :class:`str` Token to be used for identity packet **kwds: Additional kwargs to be passed through ``ws_connect`` """ logger.debug(f"Attempting to create a connection for shard {self.shard_id}") self.ws = await self.session.ws_connect(self.url, **kwds) self._snd_kwds = kwds logger.info(f"Shard {self.shard_id} has connected successfully")
[docs] async def receive_hello(self): """|coro| Receives the hello packet from discord and begins heartbeating, should be called directly after :meth:`Shard.connect` """ logger.debug(f"Receiving hello packet for Shard {self.shard_id}") packet = await self.ws.receive() data = decodeResponse(packet.data) if not data.get("op", 0) == gateway.HELLO: raise GatewayError(f"Invalid op code recieved") self._keep_alive = GatewayKeepAlive( self, data["d"]["heartbeat_interval"], self.loop ) self._keep_alive.start() logger.info(f"Hello packet successfully received, beginning heartbeats for Shard {self.shard_id}")
[docs] async def send_identity(self, token: str, intents: int, presence: Presence = None) -> None: """|coro| Sends an identity packet to discord Parameters ---------- token: :class:`str` Bot token intents: :class:`int` Intents to send presence: :class:`Presence` An optional presence to update the client with """ idn = IDENTITY_PCK.copy() idn.update({ "token": token, "intents": intents, "presence": presence, "shard": (self.shard_id, self.num_shards) }) payload = GenericWebsocketPayload( op=gateway.IDENTIFY, d=idn ) async with self.ratelimiter as lock: if lock.exceeded(self.ratelimit_key): await lock.hold_until_reset(self.ratelimit_key) lock.increment(self.ratelimit_key, lock_if_exceed=True) await self.ws.send_str(payload.json()) logger.info(f"Sent identity packet for Shard {self.shard_id}")
[docs] def listen(self, **kwds): """Generates task using handler, this task is automatically terminated by :meth:`Shard.disconnect`. .. note:: Any kwargs you pass through are sent to the handler """ coro = self.handler(**kwds) self.task = self.loop.create_task(coro) return self.task
[docs] async def disconnect(self): """|coro| Disconnects from this shard """ logger.info(f"Disconnecting from shard {self.shard_id}") self._keep_alive._ended = True await self.ws.close(code=4000) if getattr(self, "task", None): self.task.cancel(msg="Disconnect called")
[docs] async def resume(self, *, restart: bool = False): """|coro| Sends a resume packet to discord Parameters ---------- restart: :class:`bool` Whether to restart the session """ if restart: await self.ws.close(code=4000) await self.connect(**self._snd_kwds) self._keep_alive._ws = self.ws async with self.ratelimiter as lock: if lock.exceeded(self.ratelimit_key): await lock.hold_until_reset(self.ratelimit_key) lock.increment(self.ratelimit_key, lock_if_exceed=True) self.resuming = True await self.ws.send_json({ "op": gateway.RESUME, "d": { "token": self.client.token, "session_id": self.session_id, "seq": self.sequence } }) return self.ws
[docs] async def change_presence(self, presence: Presence) -> None: """|coro| Changes client presence Parameters ---------- presence: :class:`Presence` New presence for client, You may want to checkout the guide for presences. Which can be found `here <../guides/presence.html>`_. """ payload = GenericWebsocketPayload(op=gateway.PRESENCE, d=presence) logger.debug(f"Updating presence for shard {self.shard_id}") async with self.ratelimiter as lock: if lock.exceeded(self.ratelimit_key): await lock.hold_until_reset(self.ratelimit_key) lock.increment(self.ratelimit_key, lock_if_exceed=True) await self.ws.send_str(payload.json())
[docs] async def update_voice_state(self, **data) -> None: """|coro| Updates client voice state Parameters ---------- guild_id: :class:`Snowflake` id of the guild channel_id: :class:`Snowflake` id of the voice channel client wants to join (``None`` if disconnecting) self_mute: :class:`bool` is the client muted self_deaf: :class:`bool` is the client deafened """ voice_payload = VoiceStateUpdatePresence(**data) payload = GenericWebsocketPayload(op=gateway.VOICE, d=voice_payload) async with self.ratelimiter as lock: if lock.exceeded(self.ratelimit_key): await lock.hold_until_reset(self.ratelimit_key) lock.increment(self.ratelimit_key, lock_if_exceed=True) await self.ws.send_str(payload.json())
@property def ratelimit_key(self): return self.shard_id % self.num_shards def __repr__(self): return f"Shard(id={self.id}, running={self.ws is not None})"