from __future__ import annotations
from typing import Any, Dict, Iterator, List, Optional, Union
import datetime
import pydantic
from acord.core.abc import DISCORD_EPOCH, Route
from acord.models import Message, Snowflake
from acord.payloads import (
ChannelEditPayload,
InviteCreatePayload,
ThreadCreatePayload,
WebhookCreatePayload,
)
from acord.utils import _payload_dict_to_json
from acord.errors import APIObjectDepreciated
from acord.bases import PermissionsOverwrite
from acord.webhooks.webhook import Webhook
from .textExt import ExtendedTextMethods
from .base import Channel
from .thread import Thread
# Standard text channel in a guild
async def _pop_task(client, channel_id, *messages) -> None:
# Create task to pop all messages in bulk deletion
for message in messages:
client.cache.remove_message(message.channel_id, message.id)
[docs]class TextChannel(Channel, ExtendedTextMethods):
guild_id: int
""" ID of guild were text channel belongs """
position: int
""" Text channel position """
permission_overwrites: Optional[List[PermissionsOverwrite]] = list()
""" Channel Permissions """
name: str
""" Name of channel """
topic: Optional[str]
""" Channel topic """
nsfw: Optional[bool] = False
""" Whether channel is marked as NSFW """
last_message_id: Optional[int]
""" Last message in channel, may or may not be valid """
parent_id: Optional[int]
""" Category to which the channel belongs to """
last_pin_timestamp: Optional[datetime.datetime]
""" Last pinned message in channel, may be None """
permissions: Optional[str]
""" String of user permissions """
rate_limit_per_user: Optional[int]
""" Channel ratelimit """
default_auto_archive_duration: Optional[int]
""" Default time for threads to be archived """
created_at: Optional[datetime.datetime]
""" When this channel was created """
@pydantic.validator("created_at")
def _validate_snowflake(cls, _, **kwargs) -> Optional[datetime.datetime]:
if _:
raise ValueError("Time provided, not able to parse")
timestamp = ((kwargs["values"]["id"] >> 22) + DISCORD_EPOCH) / 1000
return datetime.datetime.fromtimestamp(timestamp)
[docs] async def edit(self, **options) -> Optional[Channel]:
"""|coro|
Modifies a guild channel, fires a ``channel_update`` event if channel is updated.
Parameters
----------
name: :class:`str`
New name for the channel
type: Literal[0, 5]
Change the type for the channel, currently on GUILD_TEXT and GUILD_NEWS is supported
position: :class:`int`
Change the position of the channel
topic: :class:`str`
Change the channels topic, if you wish to remove it use the :class:`MISSING` class
nsfw: :class:`bool`
Whether to mark channel as NSFW
ratelimit: :class:`int`
Change ratelimit value for channel
permission_overwrite: List[:class:`PermissionsOverwrite`]
List of permissions to overwrite in the channel
category: Union[:class:`int`, CategoryChannel]
Move the channel to a different category, use :class:`MISSING` for no category
archive_duration: Literal[0, 60, 1440, 4230, 10080]
Change the default archive duration on a thread, use :class:`MISSING` or 0 for no timeout
"""
payload = ChannelEditPayload(**options).dict()
bucket = dict(channel_id=self.id, guild_id=self.guild_id)
keys = list(payload.keys())
for k in keys:
if k not in options:
payload.pop(k)
reason = payload.pop("reason", None)
headers = dict()
if reason:
headers.update({"X-Audit-Log-Reason": reason})
headers.update({"Content-Type": "application/json"})
# Rest should be standard python vars
await self.conn.request(
Route("PATCH", path=f"/channels/{self.id}", bucket=bucket),
data=_payload_dict_to_json(ChannelEditPayload, **payload),
headers=headers,
)
[docs] @pydantic.validate_arguments
async def fetch_messages(
self,
*,
around: Optional[Union[Message, int]] = None,
before: Optional[Union[Message, int]] = None,
after: Optional[Union[Message, int]] = None,
limit: Optional[int] = 50,
) -> Iterator[Message]:
"""|coro|
Fetch messages directly from a channel
Parameters
----------
around: Union[:class:`Message`, :class:`int`]
get messages around this message ID
before: Union[:class:`Message`, :class:`int`]
get messages before this message ID
after: Union[:class:`Message`, :class:`int`]
get messages after this message ID
limit: :class:`int`
max number of messages to return (1-100).
Defaults to **50**
"""
bucket = dict(channel_id=self.id, guild_id=self.guild_id)
around = getattr(around, "id", around)
before = getattr(before, "id", before)
after = getattr(after, "id", after)
params = {"around": around, "before": before, "after": after, "limit": limit}
if not 0 < limit < 100:
raise ValueError("Messages to fetch must be an integer between 0 and 100")
resp = await self.conn.request(
Route("GET", path=f"/channels/{self.id}/messages", bucket=bucket, **params),
)
data = await resp.json()
for message in data:
msg = Message(**message)
self.conn.client.cache.add_message(msg)
yield msg
[docs] async def fetch_webhooks(self) -> Iterator[Webhook]:
"""|coro|
Fetches all webhooks in channel
"""
bucket = dict(channel_id=self.id, guild_id=self.guild_id)
r = await self.conn.request(
Route("GET", path=f"/channels/{self.id}/webhooks", bucket=bucket)
)
for hook in await r.json():
yield Webhook(adapter=self.conn, **hook)
[docs] @pydantic.validate_arguments
async def bulk_delete(
self, *messages: Union[Message, Snowflake], reason: str = None
) -> None:
"""|coro|
Deletes messages in bulk, in a channel.
.. warning::
When deleting in bulk, you need at least 2 messages and less then 100.
You must also provide your own messages to delete,
for a ``purge`` like method,
use messages from :meth:`TextChannel.fetch_messages`.
Parameters
----------
messages: Union[:class:`Message`, :class:`Snowflake`]
Messages to be deleted
reason: :class:`str`
Reason for deleting messages
"""
headers = dict()
if reason:
headers.update({"X-Audit-Log-Reason": reason})
if 2 < len(messages) < 100:
raise ValueError(
"Messages to delete must be greater then 2 and less then 100"
)
ids = set(map(lambda x: getattr(x, "id", x), messages))
await self.conn.request(
Route("POST", path=f"/channels/{self.id}/messages/bulk-delete"),
data={"messages": list(ids)},
headers=headers,
)
self.conn.client.loop.create_task(
_pop_task(self.conn.client, self.id, *ids),
name=f"acord: bulk delete: {len(ids)}",
)
# Circular imports - Fix typehint when importing
[docs] async def fetch_invites(self) -> List[Any]:
"""|coro|
Fetches all invites from channel
"""
from acord.models import Invite
r = await self.conn.request(Route("GET", path=f"/channels/{self.id}/invites"))
invites = await r.json()
return [Invite(**inv) for inv in invites]
[docs] async def create_invite(self, *, reason: str = None, **data) -> Any:
"""|coro|
Creates new invite in channel
max_age: :class:`int`
How long the invite can be used for,
must be greater or equal to 0
and less then 604800 (7 Days).
.. note::
0 is for never
max_uses: :class:`int`
How many times invite can be used,
before expiring.
Must be greater or equal to 0
and less then 100.
.. note::
0 is for infinite
temporary: :class:`bool`
Whether this invite only grants temporary membership
unique: :class:`bool`
If true,
don't try to reuse a similar invite
(useful for creating many unique one time use invites)
"""
from acord.models import Invite
headers = dict()
if reason:
headers.update({"X-Audit-Reason": reason})
if data:
data = InviteCreatePayload(**data)
r = await self.conn.request(
Route("POST", path=f"/channels/{self.id}/invites"),
data=data,
headers=headers,
)
return Invite(**(await r.json()))
[docs] @pydantic.validate_arguments
async def follow(
self, *, channel: Union[Channel, Snowflake]
) -> Dict[str, Snowflake]:
"""|coro|
Follows a guild news channel
Parameters
----------
channel: Union[:class:`Channel`, :class:`Snowflake`]
Target channel,
or channel to recieve messages from this channel.
Returns
-------
A dictionary with the keys:
* channel_id: source channel id
* webhook_id: created target webhook id
"""
if isinstance(channel, Channel):
channel = channel.id
r = await self.conn.request(
Route("POST", path=f"/channels/{self.id}/followers"),
data={"webhook_channel_id": channel},
)
return await r.json()
[docs] async def create_thread(
self,
*,
message: Union[Message, Snowflake] = None,
reason: str = None,
**options,
) -> Optional[Thread]:
"""|coro|
Creates a thread in this channel
Parameters
----------
message: Union[:class:`Message`, :class:`Snowflake`]
Message to start thread with
"""
if message:
message_id = int(getattr(message, "id", message))
path = f"/channels/{self.id}/messages/{message_id}/threads"
else:
path = f"/channels/{self.id}/threads"
data = ThreadCreatePayload(**options)
headers = dict({"Content-Type": "application/json"})
if reason:
headers.update({"X-Audit-Log-Reason": str(reason)})
r = await self.conn.request(
Route("POST", path=path), headers=headers, data=data.json()
)
thread = Thread(conn=self.conn, **(await r.json()))
self.guild.threads.update({thread.id: thread})
return thread
[docs] async def fetch_active_threads(self) -> Iterator[Thread]:
"""|coro|
Fetches all active threads in channel
.. warning::
.. rubric:: Depreciated
This method will no longer work when using API Version >= 10,
instead implement :meth:`Guild.fetch_active_threads`
"""
if int(self.conn.client.gateway_version[1]) >= 10:
raise APIObjectDepreciated(
"This method has been dropped,\
please use `Guild.active_threads`"
)
r = await self.conn.request(
Route("GET", path=f"/channels/{self.id}/threads/active")
)
body = await r.json()
for thread in body["threads"]:
tr = Thread(**thread)
self.guild.threads.update({tr.id: tr})
yield tr
[docs] async def fetch_public_archived_threads(
self, *, before: datetime.datetime = None, limit: int = None
) -> Iterator[Thread]:
"""|coro|
Fetches all public archived thread in channel
"""
body = dict()
if before:
body.update(before=before.isoformat())
if limit:
body.update(limit=int(limit))
r = await self.conn.request(
Route("GET", path=f"/channels/{self.id}/threads/archived/public"), data=body
)
data = await r.json()
for thread in data["threads"]:
tr = Thread(**thread)
self.guild.threads.update({tr.id: tr})
yield tr
[docs] async def fetch_private_archived_threads(
self, *, before: datetime.datetime = None, limit: int = None
) -> Iterator[Thread]:
"""|coro|
Fetches all private archived thread in channel
"""
body = dict()
if before:
body.update(before=before.isoformat())
if limit:
body.update(limit=int(limit))
r = await self.conn.request(
Route("GET", path=f"/channels/{self.id}/threads/archived/private"),
data=body,
)
data = await r.json()
for thread in data["threads"]:
tr = Thread(**thread)
self.guild.threads.update({tr.id: tr})
yield tr
[docs] async def fetch_joined_private_archived_threads(
self, *, before: datetime.datetime = None, limit: int = None
) -> Iterator[Thread]:
"""|coro|
Fetches all private archived threads,
that the client has joined
"""
body = dict()
if before:
body.update(before=before.isoformat())
if limit:
body.update(limit=int(limit))
r = await self.conn.request(
Route(
"GET", path=f"/channels/{self.id}/users/@me/threads/archived/private"
),
data=body,
)
data = await r.json()
for thread in data["threads"]:
tr = Thread(**thread)
self.guild.threads.update({tr.id: tr})
yield tr
[docs] async def create_webhook(self, *, reason: str = None, **data) -> Webhook:
"""|coro|
Creates a new webhook for this channel
Parameters
----------
name: :class:`str`
Name of new webhook
avatar: :class:`Avatar`
Avatar for webhook
reason: :class:`str`
Reason for creating webhook
"""
payload = WebhookCreatePayload(**data)
headers = dict({"Content-Type": reason})
if reason:
headers.update({"X-Audit-Log-Reason": reason})
r = await self.conn.request(
Route("POST", path=f"/channels/{self.id}/webhooks"),
data=payload.json(),
headers=headers,
)
return Webhook(**(await r.json()))
@property
def guild(self):
"""Returns the guild were channel was created in"""
guild = self.conn.client.get_guild(self.guild_id)
if not guild:
raise ValueError("Target guild can no longer be found")
return guild