Source code for acord.voice.transports.writer

from __future__ import annotations
from typing import Any, Union

from io import BufferedIOBase
from os import PathLike
import asyncio

from acord.errors import VoiceError
from acord.bases import File
from .base import BaseTransport

try:
    from acord.voice.core import VoiceConnection
except ImportError:
    VoiceConnection = None
try:
    from acord.voice.opus import Encoder
except ImportError:
    Encoder = None


def getFrameDur(x, y):
    # x -> pcm
    # y -> sampling rate
    return ((x * 10) // (y // 1000)) / 1000


[docs]class BasePlayer(BaseTransport): def __init__( self, conn: VoiceConnection, data: Union[BufferedIOBase, PathLike], encoder: Encoder = None, **encoder_kwargs ) -> None: if isinstance(data, File): data = data.fp if not isinstance(data, BufferedIOBase): self.fp = open(data, "rb") self.pos = 0 else: self.fp = data self.pos = data.tell() self.index = 0 self.closed = False self.encoder = encoder if not self.encoder: self.encoder = Encoder(**encoder_kwargs) self._last_pack_err = None self._last_send_err = None super().__init__(conn) @property def packet_size(self): return self.encoder.config.FRAME_SIZE def __aiter__(self): if self.closed: raise VoiceError( "Cannot iterate through transport contents as transport is closed" ) return self async def __anext__(self): try: data = self.get_next_packet() except EOFError: raise StopAsyncIteration return data def get_next_packet(self): self.index += 1 data = self.fp.read(self.packet_size) if data == b"": self.close() raise EOFError("Reached end of file") self.pos = self.fp.tell() return data
[docs] def close(self) -> None: if self.closed: raise VoiceError("Transport already closed") self.fp.close() self.closed = True
[docs] async def cleanup(self) -> None: self.fp.seek(0) self._last_pack_err = None self._last_send_err = None self.index = 0
[docs] async def send(self, data: bytes, *, flags: int = 0) -> None: if self.closed: raise VoiceError( "Cannot send bytes through transport as transport is closed" ) encoded_packet = await self.encoder.encode(data) try: # encoded_packet is a memoryview object # fine to pass through socket as its classed as a WriteOnlyBuffer await self.conn.send_audio_packet( encoded_packet, len(encoded_packet), has_header=False, sock_flags=flags, ) except OSError as exc: self.close() self._last_send_err = exc raise VoiceError( "Cannot send bytes through transport", closed=True ) from exc
async def play( self, c_flags: int = 1, delay: int = 0, *, flags: int = 0 ) -> Union[None, int]: await self.conn.change_speaking_state(c_flags, delay) async for packet in self: try: try: await self.send(data=packet, flags=flags) except AttributeError: # Socket closed return 1 await asyncio.sleep(getFrameDur(len(packet), self.encoder.config.SAMPLING_RATE)) except VoiceError as err: if getattr(err, "closed", False): return else: raise await self.conn.stop_speaking() async def __aenter__(self): return self async def __aexit__(self, *args, **kwargs): try: self.close() except VoiceError: return