from __future__ import annotations
from typing import Any, Dict, Iterator, List, Literal, Optional, Union
import pydantic
import datetime
from acord.core.abc import Route
from acord.models import Snowflake, Member
from acord.payloads import ThreadEditPayload
from acord.utils import _payload_dict_to_json
from .base import Channel
from .textExt import ExtendedTextMethods
[docs]class ThreadMember(pydantic.BaseModel):
id: Snowflake
"""ID of thread"""
user_id: Snowflake
"""ID of user in thread"""
join_timestamp: datetime.datetime
"""When did this user join this thread"""
flags: int
"""Thread member flags"""
[docs]class Thread(Channel, ExtendedTextMethods):
conn: Any
id: Snowflake
""" Thread id """
guild_id: Snowflake
""" Guild id of were thread belongs """
parent_id: Snowflake
""" Channel id of were thread was created from """
owner_id: Snowflake
""" Id of user who created this thread """
name: str
""" Name of thread """
last_message_id: Optional[Snowflake]
""" Last message sent in thread """
last_pin_timestamp: Optional[datetime.datetime]
""" Last pinned message in thread """
rate_limit_per_user: Optional[int]
""" amount of seconds a user has to wait before sending another message """
message_count: Optional[int]
""" approx amount of message in thread, stops counting after 50 """
member_count: Optional[int]
""" aprox members in thread, stops counting after 50 """
thread_metadata: ThreadMeta
""" Additional data about thread """
members: Dict[Snowflake, ThreadMember] = dict()
""" Mapping of members in thread, fills with each fetch """
[docs] async def join(self) -> None:
"""|coro|
Joins current thread
"""
await self.add_member(member="@me")
[docs] async def leave(self) -> None:
"""|coro|
Leaves current thread
"""
await self.remove_member(member="@me")
[docs] async def add_member(self, *, member: Union[Member, Snowflake]) -> None:
"""|coro|
Adds a member to the thread
Parameters
----------
member: Union[:class:`Member`, :class:`Snowflake`]
"""
if member != "@me":
member = member.id
await self.conn.request(
Route("PUT", path=f"/channels/{self.id}/thread-members/{member}")
)
[docs] async def remove_member(self, *, member: Union[Member, Snowflake]) -> None:
"""|coro|
Removes a member from the thread
Parameters
----------
member: Union[:class:`Member`, :class:`Snowflake`]
"""
if isinstance(member, Union[Member, Snowflake]):
member = member.id
await self.conn.request(
Route("DELETE", path=f"/channels/{self.id}/thread-members/{member}")
)
self.members.pop(member, None)
[docs] async def fetch_member(
self, *, member: Union[Member, Snowflake], as_guild_member: bool = False
) -> Optional[Union[ThreadMember, Member]]:
"""|coro|
Fetches member from thread
Parameters
----------
member: Union[:class:`Member`, :class:`Snowflake`]
Member to fetch
as_guild_member: :class:`bool`
whether to return the fetched member as a :class:`ThreadMember` object or :class:`Member`
"""
if isinstance(member, Member):
member = member.id
r = await self.conn.request(
Route("GET", path=f"/channels/{self.id}/thread-members/{member}")
)
tmember = ThreadMember(**(await r.json()))
self.members.update({member.user_id: tmember})
if not as_guild_member:
member = tmember
else:
id = (await r.json())["user_id"]
guild = self.conn.client.get_guild(self.guild_id)
member = guild.get_member(id)
return member
[docs] async def fetch_members(
self, *, as_guild_member: bool = False
) -> Iterator[Union[Member, ThreadMember]]:
"""|coro|
Fetches all members in the thread
Parameters
----------
as_guild_member: :class:`bool`
Whether to return the thread members as :class:`ThreadMember` or :class:`Member`
"""
r = await self.conn.request(
Route("GET", path=f"/channels/{self.id}/thread-members")
)
members = await r.json()
guild = self.conn.client.get_guild(self.guild_id)
for member in members:
nmember = ThreadMember(**member)
self.members.update({nmember.user_id: nmember})
if as_guild_member:
id = member["user_id"]
yield guild.get_member(id)
else:
yield nmember
[docs] async def edit(self, *, reason: str = None, **options) -> Optional[Thread]:
"""|coro|
Edits the current thread
Parameters
----------
reason: :class:`str`
Reason for editing thread
name: :class:`str`
New name for thread
archived: :class:`bool`
Whether the thread should be archived or not
auto_archive_duration: :class:`int`
New auto archive duration,
is set from parent channel OR during creation
rate_limit_per_user: :class:`int`
New slowmode for users in thread
"""
headers = dict({"Content-Type": "application/json"})
if reason:
headers.update({"X-Audit-Log-Reason": str(reason)})
bucket = dict(guild_id=self.guild_id, channel_id=self.id)
r = await self.conn.request(
Route("PATCH", path=f"/channels/{self.id}", bucket=bucket),
data=_payload_dict_to_json(ThreadEditPayload, **options),
headers=headers,
)
guild = self.conn.client.get_guild(self.guild_id)
tr = Thread(**(await r.json()))
guild.threads.update({tr.id: tr})
return tr