# Voice websocket connection
from __future__ import annotations
from asyncio import AbstractEventLoop, Event
import asyncio
from datetime import datetime
from aiohttp import WSMsgType
# For handling voice packets
from struct import pack_into, pack
from acord.core.heartbeat import VoiceKeepAlive
from acord.errors import VoiceError
from .udp import UDPConnection
from .codes import OpCodes
try:
import nacl.secret
_nacl = True
except ImportError:
_nacl = False
from acord.bases import _C
import logging
global CONNECTIONS
CONNECTIONS = 0
logger = logging.getLogger(__name__)
[docs]class VoiceConnection(object):
"""Represents a voice connection between client and a discord voice channel
Attributes
----------
channel_id: :class:`Snowflake`
ID of channel
guild_id: :class:`Snowflake`
ID of guild
sequence: :class:`int`
Websocket sequence
timestamp: :class:`int`
Timestamp for next RTC header
ssrc: :class:`int`
SSRC for the connection
mode: :class:`str`
The chosen mode for encrypting the packets in
disconnected: :class:`bool`
Whether this connection is disconnected
"""
supported_modes = (
"xsalsa20_poly1305_lite",
"xsalsa20_poly1305_suffix",
"xsalsa20_poly1305",
)
def __init__(
self, voice_packet: dict, loop: AbstractEventLoop, client, channel_id, **kwargs
) -> None:
if _nacl is False:
raise VoiceError("PyNaCl must be installed before using voice")
# Use existing session instead of creating a new one
self._session = client.http._session
self._packet = voice_packet
self._connect = False
self._loop = loop
self._client = client
self.channel_id = channel_id
self._ws = None
self._keep_alive = None
self._ready_packet = None
self._sock = None
self._listener = None
self._resume_kwargs = {}
self.sequence: int = 0
self.timestamp: int = 0
self.timeout: float = 0
self.ssrc: int = 0
self._lite_nonce: int = 0
self._decode_key: list = None
self.mode = None
self.connect_event = Event()
self.send_event = Event()
self.disconnected = False
self.acked_at = 0
self.ping = float("inf")
[docs] async def wait_until_connected(self):
"""|coro|
Waits until the voice connection is connected """
await self.connect_event.wait()
[docs] async def wait_until_ready(self):
"""|coro|
Waits until the voice connection is ready """
await self.wait_until_connected()
await self.send_event.wait()
[docs] async def connect(self, *, v: int = 6) -> None:
"""|coro|
Creates a websocket connection to the endpoint provided by discord
Parameters
----------
v: :class:`int`
Endpoint version
"""
global CONNECTIONS
# connects to desired endpoint creating new websocket connection
logger.debug(f"Attempting to connect to {self._packet['d']['endpoint']}")
ws = await self._session.ws_connect(
f"wss://{self._packet['d']['endpoint']}?v={v}"
)
CONNECTIONS += 1
self._conn_id = CONNECTIONS
logger.info(
f"Successfully connected to {self._packet['d']['endpoint']}, awaiting UDP handshake"
)
self._ws = ws
self.disconnected = False
[docs] async def disconnect(self, *, message: bytes = b"") -> None:
"""|coro|
Disconnects the voice connection closing both UDP and WS connection,
it also terminates the task generated by :meth:`VoiceConnection.listen`
"""
if self.disconnected:
logger.warn(
f"Disconnect called on disconnected socket, conn_id={self._conn_id}"
)
return
if not self._keep_alive:
raise ConnectionError("Keepalive doesn't exist, failed to disconnect ws")
logger.debug(
f"Client disconnected from VC conn_id={self._conn_id}, ending operations"
)
self._keep_alive.end()
try:
await self._ws.close(code=4000, message=message)
except Exception:
pass
# WS already closed or anything along them lines
# Disconnect called before sock was initialised
if self._sock:
sock = getattr(self._sock, "_sock", f"UDP Socket conn_id={self._conn_id}")
await self._sock.close()
logger.info(f"Disconnected from {sock}")
self._ws = None
self._sock = None
self._keep_alive = None
if self._listener:
self._listener.cancel("Disconnect called to end conn")
self._listener = None
logger.debug("Ended listener task")
logger.info("Disconnected from voice, Closed ws & socket and ended heartbeats")
self.disconnected = True
[docs] async def resume(self) -> None:
"""|coro|
Resumes the websocket connection for the vc,
this should rarely be called by the user.
"""
# Magic of resuming websocket connections
# Reconnect
await self.disconnect()
await self.connect()
# Set no identity to True so we can resume
self._resume_kwargs.update({"no_identity": True})
await self.listen(**self._resume_kwargs)
# Resume normally
await self._ws.send_json(
{
"op": 7,
"d": {
"server_id": self._packet["d"]["guild_id"],
"session_id": self._packet["d"]["session_id"],
"token": self._packet["d"]["token"],
},
}
)
# Log the result
logger.info(f"Resuming session for conn_id={self._conn_id}")
[docs] def identity(self, *, video: bool = False):
"""Returns the identity packet for voice"""
return {
"op": OpCodes.IDENTITY.value,
"d": {
"server_id": self._packet["d"]["guild_id"],
"user_id": self._packet["d"]["user_id"],
"session_id": self._packet["d"]["session_id"],
"token": self._packet["d"]["token"],
"video": video,
},
}
[docs] def udp_payload(self, *, mode: str = None):
"""Returns the payload for the UDP select packet"""
if not mode:
mode = self.supported_modes[0]
if mode not in self.supported_modes:
raise ValueError("Encountered unknown mode")
self.mode = mode
return {
"op": OpCodes.SELECT_PROTOCOL,
"d": {
"protocol": "udp",
"data": {
"address": self._ready_packet["d"]["ip"],
"port": self._ready_packet["d"]["port"],
"mode": mode,
},
},
}
def checked_add(self, attr, value, limit):
val = getattr(self, attr)
if (val + value) > limit:
setattr(self, attr, 0)
else:
setattr(self, attr, (val + value))
[docs] async def upd_connect(self, addr: str, port: int, **kwargs) -> None:
"""|coro|
Connects to the UDP port,
spawns a new :class:`UDPConnection`,
which can be accessed by ``_sock``.
Parameters
----------
addr: :class:`str`
IP of the UDP port
port: :class:`int`
Connection port
**kwargs:
Additional arguments to be passed to the UDPConnection
"""
# Finishes handshake whilst connected to vc
# self._sock will be a tuple with the transport and protocol
logger.debug(
f"Attempting to complete UDP connection for conn_id={self._conn_id}"
)
conn = UDPConnection(
self._ready_packet["d"]["ip"],
self._ready_packet["d"]["port"],
self._loop,
**kwargs,
)
await conn.connect()
logger.info(
f"Successfully connected to {addr}:{port} for conn_id={self._conn_id}"
)
self._sock = conn
def _get_audio_packet(self, data: bytes) -> bytes:
header = bytearray(12)
header[0] = 0x80
header[1] = 0x78
pack_into(">H", header, 2, self.sequence)
pack_into(">I", header, 4, self.timestamp)
pack_into(">I", header, 8, self.ssrc)
encrypter = getattr(self, f"_encrypt_{self.mode}")
return encrypter(header, data)
[docs] async def send_audio_packet(
self,
data: bytes,
frames: int,
*,
has_header: bool = False,
sock_flags: int = 0,
) -> None:
"""|coro|
Sends an audio packet to discord
Parameters
----------
data: :class:`bytes`
Bytes of data to send to discord
has_header: :class:`bool`
Whether the data has an RTC header attached to it.
Defaults to False and should only be True if you know what your doing.
frames: :class:`int`
Your encoders SAMPLES_PER_FRAME value,
used for generating timestamp
sock_flags: :class:`int`
Additional flags for the UDP socket
"""
self.checked_add("sequence", 1, 65535)
if not has_header:
data = self._get_audio_packet(data)
await self._sock.write(data, flags=sock_flags)
self.checked_add("timestamp", frames, 4294967295)
[docs] async def client_connect(self) -> None:
"""|coro|
Sends a OP 12 payload through the ws.
"""
await self._ws.send_json(
{
"op": OpCodes.VIDEO.value,
"d": {
"audio_ssrc": self.ssrc,
"rtx_ssrc": 0,
"streams": [{"type": "video", "active": False}],
"video_ssrc": 0,
},
}
)
logger.info(f"Sent ssrc for conn_id={self._conn_id}")
[docs] async def change_speaking_state(self, flags: int = 1, delay: int = 0) -> None:
"""|coro|
Changes the speaking state of the client
Parameters
----------
flags: :class:`int`
Speaking flag, defaults to 1
delay: :class:`int`
Delay before client speaks
"""
if not self.ssrc:
raise VoiceError("Cannot update speaking state, no ssrc set")
payload = {
"op": OpCodes.SPEAKING.value,
"d": {"speaking": flags, "delay": delay, "ssrc": self.ssrc},
}
await self._ws.send_json(payload)
logger.info(
f"Updated speaking state for conn_id={self._conn_id}, payload:\n{payload}"
)
[docs] async def stop_speaking(self) -> None:
"""|coro|
Shortcut method to :meth:`VoiceConnection.change_speaking_state`,
tells discord that the client has stopped speaking
"""
# Stops speaking indicator, should be called after audio transmission
await self.change_speaking_state(0, 0)
logger.info(f"Client speaking indicator removed for conn_id={self._conn_id}")
[docs] async def listen(self, **kwargs) -> None:
"""Begins to listen for websocket events,
to terminate this simply end generated task"""
tsk = self._loop.create_task(self._handle_voice(**kwargs))
self._listener = tsk
self._resume_kwargs = kwargs
async def _handle_voice(
self, *, after: _C = None, no_identity: bool = False, **kwargs
) -> None:
"""Handles incoming data from websocket"""
if not self._ws:
raise ValueError("Not established websocket connecting")
if not no_identity:
# Resume is doing its thing
await self._ws.send_json(self.identity())
logger.info(f"Sent identity packet for voice ws conn_id={self._conn_id}")
while True:
try:
message = await self._ws.receive()
except ConnectionResetError:
break
try:
data = message.json()
except TypeError:
# Handling any errors we don't like >:D
if self._ws._close_code == 4015:
await self.resume()
elif self._ws._close_code == 4014:
pass
elif message.type in (WSMsgType.CLOSED, WSMsgType.CLOSING):
logger.warn(
f"Voice WS closed for conn_id={self._conn_id}, disconnecting shortly"
)
elif message.type == WSMsgType.ERROR:
logger.error(
f"Voice WS for conn_id={self._conn_id} has closed",
exc_info=(
type(message.extra),
message.extra,
message.extra.__traceback__,
),
)
else:
logger.info(
f"Received invalid json data for voice ws conn_id={self._conn_id},\
closing ws, code={self._ws._close_code}"
)
await self.disconnect()
break
except (asyncio.CancelledError, asyncio.TimeoutError):
logger.warn(
f"Voice WS adruptly closed for conn_id={self._conn_id}, ending connection"
)
await self.disconnect()
break
if data["op"] == OpCodes.HELLO.value:
self._keep_alive = VoiceKeepAlive(self, data)
self._keep_alive.start()
elif data["op"] == OpCodes.READY.value:
self._ready_packet = data
await self.upd_connect(
data["d"]["ip"],
data["d"]["port"],
client=self._client,
vc_Ws=self,
conn_id=self._conn_id,
)
udp_payload = self.udp_payload(**kwargs)
await self._ws.send_json(udp_payload)
logger.info(f"Sent select payload for conn_id={self._conn_id}")
self.connect_event.set()
self.ssrc = data["d"]["ssrc"]
if after:
await after()
elif data["op"] == OpCodes.SELECT_PROTOCOL_ACK.value:
self._decode_key = data["d"]["secret_key"]
self.send_event.set()
elif data["op"] == OpCodes.CLIENT_DISCONNECT.value:
await self.disconnect()
break
elif data["op"] == OpCodes.SPEAKING.value:
print(data)
await self._client.dispatch(
"voice_channel_speak", self.channel_id, data["d"]["user_id"]
)
elif data["op"] == OpCodes.HEARTBEAT_ACK.value:
self.ping = datetime.utcnow().timestamp() - self.acked_at
# else:
# print(data["op"])
# print(data)
# NOTE: encryption methods
def _encrypt_xsalsa20_poly1305(self, header: bytes, data) -> bytes:
box = nacl.secret.SecretBox(bytes(self._decode_key))
nonce = bytearray(24)
nonce[:12] = header
return header + box.encrypt(bytes(data), bytes(nonce)).ciphertext
def _encrypt_xsalsa20_poly1305_suffix(self, header: bytes, data) -> bytes:
box = nacl.secret.SecretBox(bytes(self._decode_key))
nonce = nacl.utils.random(nacl.secret.SecretBox.NONCE_SIZE)
return header + box.encrypt(bytes(data), nonce).ciphertext + nonce
def _encrypt_xsalsa20_poly1305_lite(self, header: bytes, data) -> bytes:
box = nacl.secret.SecretBox(bytes(self._decode_key))
nonce = bytearray(24)
nonce[:4] = pack(">I", self._lite_nonce)
self.checked_add("_lite_nonce", 1, 4294967295)
return header + box.encrypt(bytes(data), bytes(nonce)).ciphertext + nonce[:4]
# NOTE: properties and what not
@property
def guild_id(self) -> str:
return self._packet["d"]["guild_id"]