Source code for acord.voice.core

# Voice websocket connection
from __future__ import annotations

from asyncio import AbstractEventLoop, Event
import asyncio
from datetime import datetime
from aiohttp import  WSMsgType

# For handling voice packets
from struct import pack_into, pack
from acord.core.heartbeat import VoiceKeepAlive
from acord.errors import VoiceError
from .udp import UDPConnection
from .codes import OpCodes

try:
    import nacl.secret
    _nacl = True
except ImportError:
    _nacl = False

from acord.bases import _C

import logging

global CONNECTIONS
CONNECTIONS = 0
logger = logging.getLogger(__name__)


[docs]class VoiceConnection(object): """Represents a voice connection between client and a discord voice channel Attributes ---------- channel_id: :class:`Snowflake` ID of channel guild_id: :class:`Snowflake` ID of guild sequence: :class:`int` Websocket sequence timestamp: :class:`int` Timestamp for next RTC header ssrc: :class:`int` SSRC for the connection mode: :class:`str` The chosen mode for encrypting the packets in disconnected: :class:`bool` Whether this connection is disconnected """ supported_modes = ( "xsalsa20_poly1305_lite", "xsalsa20_poly1305_suffix", "xsalsa20_poly1305", ) def __init__( self, voice_packet: dict, loop: AbstractEventLoop, client, channel_id, **kwargs ) -> None: if _nacl is False: raise VoiceError("PyNaCl must be installed before using voice") # Use existing session instead of creating a new one self._session = client.http._session self._packet = voice_packet self._connect = False self._loop = loop self._client = client self.channel_id = channel_id self._ws = None self._keep_alive = None self._ready_packet = None self._sock = None self._listener = None self._resume_kwargs = {} self.sequence: int = 0 self.timestamp: int = 0 self.timeout: float = 0 self.ssrc: int = 0 self._lite_nonce: int = 0 self._decode_key: list = None self.mode = None self.connect_event = Event() self.send_event = Event() self.disconnected = False self.acked_at = 0 self.ping = float("inf")
[docs] async def wait_until_connected(self): """|coro| Waits until the voice connection is connected """ await self.connect_event.wait()
[docs] async def wait_until_ready(self): """|coro| Waits until the voice connection is ready """ await self.wait_until_connected() await self.send_event.wait()
[docs] async def connect(self, *, v: int = 6) -> None: """|coro| Creates a websocket connection to the endpoint provided by discord Parameters ---------- v: :class:`int` Endpoint version """ global CONNECTIONS # connects to desired endpoint creating new websocket connection logger.debug(f"Attempting to connect to {self._packet['d']['endpoint']}") ws = await self._session.ws_connect( f"wss://{self._packet['d']['endpoint']}?v={v}" ) CONNECTIONS += 1 self._conn_id = CONNECTIONS logger.info( f"Successfully connected to {self._packet['d']['endpoint']}, awaiting UDP handshake" ) self._ws = ws self.disconnected = False
[docs] async def disconnect(self, *, message: bytes = b"") -> None: """|coro| Disconnects the voice connection closing both UDP and WS connection, it also terminates the task generated by :meth:`VoiceConnection.listen` """ if self.disconnected: logger.warn( f"Disconnect called on disconnected socket, conn_id={self._conn_id}" ) return if not self._keep_alive: raise ConnectionError("Keepalive doesn't exist, failed to disconnect ws") logger.debug( f"Client disconnected from VC conn_id={self._conn_id}, ending operations" ) self._keep_alive.end() try: await self._ws.close(code=4000, message=message) except Exception: pass # WS already closed or anything along them lines # Disconnect called before sock was initialised if self._sock: sock = getattr(self._sock, "_sock", f"UDP Socket conn_id={self._conn_id}") await self._sock.close() logger.info(f"Disconnected from {sock}") self._ws = None self._sock = None self._keep_alive = None if self._listener: self._listener.cancel("Disconnect called to end conn") self._listener = None logger.debug("Ended listener task") logger.info("Disconnected from voice, Closed ws & socket and ended heartbeats") self.disconnected = True
[docs] async def resume(self) -> None: """|coro| Resumes the websocket connection for the vc, this should rarely be called by the user. """ # Magic of resuming websocket connections # Reconnect await self.disconnect() await self.connect() # Set no identity to True so we can resume self._resume_kwargs.update({"no_identity": True}) await self.listen(**self._resume_kwargs) # Resume normally await self._ws.send_json( { "op": 7, "d": { "server_id": self._packet["d"]["guild_id"], "session_id": self._packet["d"]["session_id"], "token": self._packet["d"]["token"], }, } ) # Log the result logger.info(f"Resuming session for conn_id={self._conn_id}")
[docs] def identity(self, *, video: bool = False): """Returns the identity packet for voice""" return { "op": OpCodes.IDENTITY.value, "d": { "server_id": self._packet["d"]["guild_id"], "user_id": self._packet["d"]["user_id"], "session_id": self._packet["d"]["session_id"], "token": self._packet["d"]["token"], "video": video, }, }
[docs] def udp_payload(self, *, mode: str = None): """Returns the payload for the UDP select packet""" if not mode: mode = self.supported_modes[0] if mode not in self.supported_modes: raise ValueError("Encountered unknown mode") self.mode = mode return { "op": OpCodes.SELECT_PROTOCOL, "d": { "protocol": "udp", "data": { "address": self._ready_packet["d"]["ip"], "port": self._ready_packet["d"]["port"], "mode": mode, }, }, }
def checked_add(self, attr, value, limit): val = getattr(self, attr) if (val + value) > limit: setattr(self, attr, 0) else: setattr(self, attr, (val + value))
[docs] async def upd_connect(self, addr: str, port: int, **kwargs) -> None: """|coro| Connects to the UDP port, spawns a new :class:`UDPConnection`, which can be accessed by ``_sock``. Parameters ---------- addr: :class:`str` IP of the UDP port port: :class:`int` Connection port **kwargs: Additional arguments to be passed to the UDPConnection """ # Finishes handshake whilst connected to vc # self._sock will be a tuple with the transport and protocol logger.debug( f"Attempting to complete UDP connection for conn_id={self._conn_id}" ) conn = UDPConnection( self._ready_packet["d"]["ip"], self._ready_packet["d"]["port"], self._loop, **kwargs, ) await conn.connect() logger.info( f"Successfully connected to {addr}:{port} for conn_id={self._conn_id}" ) self._sock = conn
def _get_audio_packet(self, data: bytes) -> bytes: header = bytearray(12) header[0] = 0x80 header[1] = 0x78 pack_into(">H", header, 2, self.sequence) pack_into(">I", header, 4, self.timestamp) pack_into(">I", header, 8, self.ssrc) encrypter = getattr(self, f"_encrypt_{self.mode}") return encrypter(header, data)
[docs] async def send_audio_packet( self, data: bytes, frames: int, *, has_header: bool = False, sock_flags: int = 0, ) -> None: """|coro| Sends an audio packet to discord Parameters ---------- data: :class:`bytes` Bytes of data to send to discord has_header: :class:`bool` Whether the data has an RTC header attached to it. Defaults to False and should only be True if you know what your doing. frames: :class:`int` Your encoders SAMPLES_PER_FRAME value, used for generating timestamp sock_flags: :class:`int` Additional flags for the UDP socket """ self.checked_add("sequence", 1, 65535) if not has_header: data = self._get_audio_packet(data) await self._sock.write(data, flags=sock_flags) self.checked_add("timestamp", frames, 4294967295)
[docs] async def client_connect(self) -> None: """|coro| Sends a OP 12 payload through the ws. """ await self._ws.send_json( { "op": OpCodes.VIDEO.value, "d": { "audio_ssrc": self.ssrc, "rtx_ssrc": 0, "streams": [{"type": "video", "active": False}], "video_ssrc": 0, }, } ) logger.info(f"Sent ssrc for conn_id={self._conn_id}")
[docs] async def change_speaking_state(self, flags: int = 1, delay: int = 0) -> None: """|coro| Changes the speaking state of the client Parameters ---------- flags: :class:`int` Speaking flag, defaults to 1 delay: :class:`int` Delay before client speaks """ if not self.ssrc: raise VoiceError("Cannot update speaking state, no ssrc set") payload = { "op": OpCodes.SPEAKING.value, "d": {"speaking": flags, "delay": delay, "ssrc": self.ssrc}, } await self._ws.send_json(payload) logger.info( f"Updated speaking state for conn_id={self._conn_id}, payload:\n{payload}" )
[docs] async def stop_speaking(self) -> None: """|coro| Shortcut method to :meth:`VoiceConnection.change_speaking_state`, tells discord that the client has stopped speaking """ # Stops speaking indicator, should be called after audio transmission await self.change_speaking_state(0, 0) logger.info(f"Client speaking indicator removed for conn_id={self._conn_id}")
[docs] async def listen(self, **kwargs) -> None: """Begins to listen for websocket events, to terminate this simply end generated task""" tsk = self._loop.create_task(self._handle_voice(**kwargs)) self._listener = tsk self._resume_kwargs = kwargs
async def _handle_voice( self, *, after: _C = None, no_identity: bool = False, **kwargs ) -> None: """Handles incoming data from websocket""" if not self._ws: raise ValueError("Not established websocket connecting") if not no_identity: # Resume is doing its thing await self._ws.send_json(self.identity()) logger.info(f"Sent identity packet for voice ws conn_id={self._conn_id}") while True: try: message = await self._ws.receive() except ConnectionResetError: break try: data = message.json() except TypeError: # Handling any errors we don't like >:D if self._ws._close_code == 4015: await self.resume() elif self._ws._close_code == 4014: pass elif message.type in (WSMsgType.CLOSED, WSMsgType.CLOSING): logger.warn( f"Voice WS closed for conn_id={self._conn_id}, disconnecting shortly" ) elif message.type == WSMsgType.ERROR: logger.error( f"Voice WS for conn_id={self._conn_id} has closed", exc_info=( type(message.extra), message.extra, message.extra.__traceback__, ), ) else: logger.info( f"Received invalid json data for voice ws conn_id={self._conn_id},\ closing ws, code={self._ws._close_code}" ) await self.disconnect() break except (asyncio.CancelledError, asyncio.TimeoutError): logger.warn( f"Voice WS adruptly closed for conn_id={self._conn_id}, ending connection" ) await self.disconnect() break if data["op"] == OpCodes.HELLO.value: self._keep_alive = VoiceKeepAlive(self, data) self._keep_alive.start() elif data["op"] == OpCodes.READY.value: self._ready_packet = data await self.upd_connect( data["d"]["ip"], data["d"]["port"], client=self._client, vc_Ws=self, conn_id=self._conn_id, ) udp_payload = self.udp_payload(**kwargs) await self._ws.send_json(udp_payload) logger.info(f"Sent select payload for conn_id={self._conn_id}") self.connect_event.set() self.ssrc = data["d"]["ssrc"] if after: await after() elif data["op"] == OpCodes.SELECT_PROTOCOL_ACK.value: self._decode_key = data["d"]["secret_key"] self.send_event.set() elif data["op"] == OpCodes.CLIENT_DISCONNECT.value: await self.disconnect() break elif data["op"] == OpCodes.SPEAKING.value: print(data) await self._client.dispatch( "voice_channel_speak", self.channel_id, data["d"]["user_id"] ) elif data["op"] == OpCodes.HEARTBEAT_ACK.value: self.ping = datetime.utcnow().timestamp() - self.acked_at # else: # print(data["op"]) # print(data) # NOTE: encryption methods def _encrypt_xsalsa20_poly1305(self, header: bytes, data) -> bytes: box = nacl.secret.SecretBox(bytes(self._decode_key)) nonce = bytearray(24) nonce[:12] = header return header + box.encrypt(bytes(data), bytes(nonce)).ciphertext def _encrypt_xsalsa20_poly1305_suffix(self, header: bytes, data) -> bytes: box = nacl.secret.SecretBox(bytes(self._decode_key)) nonce = nacl.utils.random(nacl.secret.SecretBox.NONCE_SIZE) return header + box.encrypt(bytes(data), nonce).ciphertext + nonce def _encrypt_xsalsa20_poly1305_lite(self, header: bytes, data) -> bytes: box = nacl.secret.SecretBox(bytes(self._decode_key)) nonce = bytearray(24) nonce[:4] = pack(">I", self._lite_nonce) self.checked_add("_lite_nonce", 1, 4294967295) return header + box.encrypt(bytes(data), bytes(nonce)).ciphertext + nonce[:4] # NOTE: properties and what not @property def guild_id(self) -> str: return self._packet["d"]["guild_id"]