Source code for fbchat._threads._abc

import abc
import attr
import collections
import datetime
from .._common import log, attrs_default
from .. import _util, _exception, _session, _graphql, _models
from typing import MutableMapping, Mapping, Any, Iterable, Tuple, Optional


DEFAULT_COLOR = "#0084ff"
SETABLE_COLORS = (
    DEFAULT_COLOR,
    "#44bec7",
    "#ffc300",
    "#fa3c4c",
    "#d696bb",
    "#6699cc",
    "#13cf13",
    "#ff7e29",
    "#e68585",
    "#7646ff",
    "#20cef5",
    "#67b868",
    "#d4a88c",
    "#ff5ca1",
    "#a695c7",
    "#ff7ca8",
    "#1adb5b",
    "#f01d6a",
    "#ff9c19",
    "#0edcde",
)


[docs]class ThreadABC(metaclass=abc.ABCMeta): """Implemented by thread-like classes. This is private to implement. """ @property @abc.abstractmethod def session(self) -> _session.Session: """The session to use when making requests.""" raise NotImplementedError @property @abc.abstractmethod def id(self) -> str: """The unique identifier of the thread.""" raise NotImplementedError @abc.abstractmethod def _to_send_data(self) -> MutableMapping[str, str]: raise NotImplementedError # Note: # You can go out of Facebook's spec with `self.session._do_send_request`! # # A few examples: # - You can send a sticker and an emoji at the same time # - You can wave, send a sticker and text at the same time # - You can reply to a message with a sticker # # We won't support those use cases, it'll make for a confusing API! # If we absolutely need to in the future, we can always add extra functionality @abc.abstractmethod def _copy(self) -> "ThreadABC": """It may or may not be a good idea to attach the current thread to new objects. So for now, we use this method to create a new thread. This should return the minimal representation of the thread (e.g. not UserData). """ raise NotImplementedError def fetch(self): # TODO: This raise NotImplementedError
[docs] def wave(self, first: bool = True) -> str: """Wave hello to the thread. Args: first: Whether to wave first or wave back Example: Wave back to the thread. >>> thread.wave(False) """ data = self._to_send_data() data["action_type"] = "ma-type:user-generated-message" data["lightweight_action_attachment[lwa_state]"] = ( "INITIATED" if first else "RECIPROCATED" ) data["lightweight_action_attachment[lwa_type]"] = "WAVE" message_id, thread_id = self.session._do_send_request(data) return message_id
[docs] def send_text( self, text: str, mentions: Iterable["_models.Mention"] = None, files: Iterable[Tuple[str, str]] = None, reply_to_id: str = None, ) -> str: """Send a message to the thread. Args: text: Text to send mentions: Optional mentions files: Optional tuples, each containing an uploaded file's ID and mimetype. See `ThreadABC.send_files` for an example. reply_to_id: Optional message to reply to Example: >>> mention = fbchat.Mention(thread_id="1234", offset=5, length=2) >>> thread.send_text("A message", mentions=[mention]) Returns: The sent message """ data = self._to_send_data() data["action_type"] = "ma-type:user-generated-message" if text is not None: # To support `send_files` data["body"] = text for i, mention in enumerate(mentions or ()): data.update(mention._to_send_data(i)) if files: data["has_attachment"] = True for i, (file_id, mimetype) in enumerate(files or ()): data["{}s[{}]".format(_util.mimetype_to_key(mimetype), i)] = file_id if reply_to_id: data["replied_to_message_id"] = reply_to_id return self.session._do_send_request(data)
[docs] def send_emoji(self, emoji: str, size: "_models.EmojiSize") -> str: """Send an emoji to the thread. Args: emoji: The emoji to send size: The size of the emoji Example: >>> thread.send_emoji("😀", size=fbchat.EmojiSize.LARGE) Returns: The sent message """ data = self._to_send_data() data["action_type"] = "ma-type:user-generated-message" data["body"] = emoji data["tags[0]"] = "hot_emoji_size:{}".format(size.name.lower()) return self.session._do_send_request(data)
[docs] def send_sticker(self, sticker_id: str) -> str: """Send a sticker to the thread. Args: sticker_id: ID of the sticker to send Example: Send a sticker with the id "1889713947839631" >>> thread.send_sticker("1889713947839631") Returns: The sent message """ data = self._to_send_data() data["action_type"] = "ma-type:user-generated-message" data["sticker_id"] = sticker_id return self.session._do_send_request(data)
def _send_location(self, current, latitude, longitude): data = self._to_send_data() data["action_type"] = "ma-type:user-generated-message" data["location_attachment[coordinates][latitude]"] = latitude data["location_attachment[coordinates][longitude]"] = longitude data["location_attachment[is_current_location]"] = current return self.session._do_send_request(data)
[docs] def send_location(self, latitude: float, longitude: float): """Send a given location to a thread as the user's current location. Args: latitude: The location latitude longitude: The location longitude Example: Send a location in London, United Kingdom. >>> thread.send_location(51.5287718, -0.2416815) """ self._send_location(True, latitude=latitude, longitude=longitude)
[docs] def send_pinned_location(self, latitude: float, longitude: float): """Send a given location to a thread as a pinned location. Args: latitude: The location latitude longitude: The location longitude Example: Send a pinned location in Beijing, China. >>> thread.send_pinned_location(39.9390731, 116.117273) """ self._send_location(False, latitude=latitude, longitude=longitude)
[docs] def send_files(self, files: Iterable[Tuple[str, str]]): """Send files from file IDs to a thread. `files` should be a list of tuples, with a file's ID and mimetype. Example: Upload and send a video to a thread. >>> with open("video.mp4", "rb") as f: ... files = client.upload([("video.mp4", f, "video/mp4")]) >>> >>> thread.send_files(files) """ return self.send_text(text=None, files=files)
# xmd = {"quick_replies": []} # for quick_reply in quick_replies: # # TODO: Move this to `_quick_reply.py` # q = dict() # q["content_type"] = quick_reply._type # q["payload"] = quick_reply.payload # q["external_payload"] = quick_reply.external_payload # q["data"] = quick_reply.data # if quick_reply.is_response: # q["ignore_for_webhook"] = False # if isinstance(quick_reply, _quick_reply.QuickReplyText): # q["title"] = quick_reply.title # if not isinstance(quick_reply, _quick_reply.QuickReplyLocation): # q["image_url"] = quick_reply.image_url # xmd["quick_replies"].append(q) # if len(quick_replies) == 1 and quick_replies[0].is_response: # xmd["quick_replies"] = xmd["quick_replies"][0] # data["platform_xmd"] = _util.json_minimal(xmd) # TODO: This! # def quick_reply(self, quick_reply: QuickReply, payload=None): # """Reply to chosen quick reply. # # Args: # quick_reply: Quick reply to reply to # payload: Optional answer to the quick reply # """ # if isinstance(quick_reply, QuickReplyText): # new = QuickReplyText( # payload=quick_reply.payload, # external_payload=quick_reply.external_payload, # data=quick_reply.data, # is_response=True, # title=quick_reply.title, # image_url=quick_reply.image_url, # ) # return self.send(Message(text=quick_reply.title, quick_replies=[new])) # elif isinstance(quick_reply, QuickReplyLocation): # if not isinstance(payload, LocationAttachment): # raise TypeError("Payload must be an instance of `LocationAttachment`") # return self.send_location(payload) # elif isinstance(quick_reply, QuickReplyEmail): # new = QuickReplyEmail( # payload=payload if payload else self.get_emails()[0], # external_payload=quick_reply.payload, # data=quick_reply.data, # is_response=True, # image_url=quick_reply.image_url, # ) # return self.send(Message(text=payload, quick_replies=[new])) # elif isinstance(quick_reply, QuickReplyPhoneNumber): # new = QuickReplyPhoneNumber( # payload=payload if payload else self.get_phone_numbers()[0], # external_payload=quick_reply.payload, # data=quick_reply.data, # is_response=True, # image_url=quick_reply.image_url, # ) # return self.send(Message(text=payload, quick_replies=[new])) def _search_messages(self, query, offset, limit): data = { "query": query, "snippetOffset": offset, "snippetLimit": limit, "identifier": "thread_fbid", "thread_fbid": self.id, } j = self.session._payload_post("/ajax/mercury/search_snippets.php?dpr=1", data) result = j["search_snippets"][query].get(self.id) if not result: return (0, []) thread = self._copy() snippets = [ _models.MessageSnippet._parse(thread, snippet) for snippet in result["snippets"] ] return (result["num_total_snippets"], snippets)
[docs] def search_messages( self, query: str, limit: int ) -> Iterable[_models.MessageSnippet]: """Find and get message IDs by query. Warning! If someone send a message to the thread that matches the query, while we're searching, some snippets will get returned twice. This is fundamentally not fixable, it's just how the endpoint is implemented. The returned message snippets are ordered by last sent first. Args: query: Text to search for limit: Max. number of message snippets to retrieve Example: Fetch the latest message in the thread that matches the query. >>> (message,) = thread.search_messages("abc", limit=1) >>> message.text "Some text and abc" """ offset = 0 # The max limit is measured empirically to 420, safe default chosen below for limit in _util.get_limits(limit, max_limit=50): _, snippets = self._search_messages(query, offset, limit) yield from snippets if len(snippets) < limit: return # No more data to fetch offset += limit
def _fetch_messages(self, limit, before): params = { "id": self.id, "message_limit": limit, "load_messages": True, "load_read_receipts": True, # "load_delivery_receipts": False, # "is_work_teamwork_not_putting_muted_in_unreads": False, "before": _util.datetime_to_millis(before) if before else None, } (j,) = self.session._graphql_requests( _graphql.from_doc_id("1860982147341344", params) # 2696825200377124 ) if j.get("message_thread") is None: raise _exception.ParseError("Could not fetch messages", data=j) # TODO: Should we parse the returned thread data, too? read_receipts = j["message_thread"]["read_receipts"]["nodes"] thread = self._copy() return [ _models.MessageData._from_graphql(thread, message, read_receipts) for message in j["message_thread"]["messages"]["nodes"] ]
[docs] def fetch_messages(self, limit: Optional[int]) -> Iterable["_models.Message"]: """Fetch messages in a thread. The returned messages are ordered by last sent first. Args: limit: Max. number of threads to retrieve. If ``None``, all threads will be retrieved. Example: >>> for message in thread.fetch_messages(limit=5) ... print(message.text) ... A message Another message None A fourth message """ # This is measured empirically as 210 in extreme cases, fairly safe default # chosen below MAX_BATCH_LIMIT = 100 before = None for limit in _util.get_limits(limit, MAX_BATCH_LIMIT): messages = self._fetch_messages(limit, before) messages.reverse() if before: # Strip the first messages yield from messages[1:] else: yield from messages if len(messages) < MAX_BATCH_LIMIT: return # No more data to fetch before = messages[-1].created_at
def _fetch_images(self, limit, after): data = {"id": self.id, "first": limit, "after": after} (j,) = self.session._graphql_requests( _graphql.from_query_id("515216185516880", data) ) if not j[self.id]: raise _exception.ParseError("Could not find images", data=j) result = j[self.id]["message_shared_media"] rtn = [] for edge in result["edges"]: node = edge["node"] type_ = node["__typename"] if type_ == "MessageImage": rtn.append(_models.ImageAttachment._from_list(node)) elif type_ == "MessageVideo": rtn.append(_models.VideoAttachment._from_list(node)) else: log.warning("Unknown image type %s, data: %s", type_, edge) rtn.append(None) # result["page_info"]["has_next_page"] is not correct when limit > 12 return (result["page_info"]["end_cursor"], rtn)
[docs] def fetch_images(self, limit: Optional[int]) -> Iterable["_models.Attachment"]: """Fetch images/videos posted in the thread. The returned images are ordered by last sent first. Args: limit: Max. number of images to retrieve. If ``None``, all images will be retrieved. Example: >>> for image in thread.fetch_messages(limit=3) ... print(image.id) ... 1234 2345 """ cursor = None # The max limit on this request is unknown, so we set it reasonably high # This way `limit=None` also still works for limit in _util.get_limits(limit, max_limit=1000): cursor, images = self._fetch_images(limit, cursor) if not images: return # No more data to fetch for image in images: if image: yield image
[docs] def set_nickname(self, user_id: str, nickname: str): """Change the nickname of a user in the thread. Args: user_id: User that will have their nickname changed nickname: New nickname Example: >>> thread.set_nickname("1234", "A nickname") """ data = { "nickname": nickname, "participant_id": user_id, "thread_or_other_fbid": self.id, } j = self.session._payload_post( "/messaging/save_thread_nickname/?source=thread_settings&dpr=1", data )
[docs] def set_color(self, color: str): """Change thread color. The new color must be one of the following:: "#0084ff", "#44bec7", "#ffc300", "#fa3c4c", "#d696bb", "#6699cc", "#13cf13", "#ff7e29", "#e68585", "#7646ff", "#20cef5", "#67b868", "#d4a88c", "#ff5ca1", "#a695c7", "#ff7ca8", "#1adb5b", "#f01d6a", "#ff9c19" or "#0edcde". This list is subject to change in the future! The default when creating a new thread is ``"#0084ff"``. Args: color: New thread color Example: Set the thread color to "Coral Pink". >>> thread.set_color("#e68585") """ if color not in SETABLE_COLORS: raise ValueError( "Invalid color! Please use one of: {}".format(SETABLE_COLORS) ) # Set color to "" if DEFAULT_COLOR. Just how the endpoint works... if color == DEFAULT_COLOR: color = "" data = {"color_choice": color, "thread_or_other_fbid": self.id} j = self.session._payload_post( "/messaging/save_thread_color/?source=thread_settings&dpr=1", data )
# def set_theme(self, theme_id: str): # data = { # "client_mutation_id": "0", # "actor_id": self.session.user.id, # "thread_id": self.id, # "theme_id": theme_id, # "source": "SETTINGS", # } # j = self.session._graphql_requests( # _graphql.from_doc_id("1768656253222505", {"data": data}) # )
[docs] def set_emoji(self, emoji: Optional[str]): """Change thread emoji. Args: emoji: New thread emoji. If ``None``, will be set to the default "LIKE" icon Example: Set the thread emoji to "😊". >>> thread.set_emoji("😊") """ data = {"emoji_choice": emoji, "thread_or_other_fbid": self.id} # While changing the emoji, the Facebook web client actually sends multiple # different requests, though only this one is required to make the change. j = self.session._payload_post( "/messaging/save_thread_emoji/?source=thread_settings&dpr=1", data )
[docs] def forward_attachment(self, attachment_id: str): """Forward an attachment. Args: attachment_id: Attachment ID to forward Example: >>> thread.forward_attachment("1234") """ data = { "attachment_id": attachment_id, "recipient_map[{}]".format(_util.generate_offline_threading_id()): self.id, } j = self.session._payload_post("/mercury/attachments/forward/", data) if not j.get("success"): raise _exception.ExternalError("Failed forwarding attachment", j["error"])
def _set_typing(self, typing): data = { "typ": "1" if typing else "0", "thread": self.id, # TODO: This # "to": self.id if isinstance(self, _user.User) else "", "source": "mercury-chat", } j = self.session._payload_post("/ajax/messaging/typ.php", data)
[docs] def start_typing(self): """Set the current user to start typing in the thread. Example: >>> thread.start_typing() """ self._set_typing(True)
[docs] def stop_typing(self): """Set the current user to stop typing in the thread. Example: >>> thread.stop_typing() """ self._set_typing(False)
[docs] def create_plan( self, name: str, at: datetime.datetime, location_name: str = None, location_id: str = None, ): """Create a new plan. # TODO: Arguments Args: name: Name of the new plan at: When the plan is for Example: >>> thread.create_plan(...) """ return _models.Plan._create(self, name, at, location_name, location_id)
[docs] def create_poll(self, question: str, options: Mapping[str, bool]): """Create poll in a thread. Args: question: The question options: Options and whether you want to select the option Example: >>> thread.create_poll("Test poll", {"Option 1": True, "Option 2": False}) """ # We're using ordered dictionaries, because the Facebook endpoint that parses # the POST parameters is badly implemented, and deals with ordering the options # wrongly. If you can find a way to fix this for the endpoint, or if you find # another endpoint, please do suggest it ;) data = collections.OrderedDict( [("question_text", question), ("target_id", self.id)] ) for i, (text, vote) in enumerate(options.items()): data["option_text_array[{}]".format(i)] = text data["option_is_selected_array[{}]".format(i)] = "1" if vote else "0" j = self.session._payload_post( "/messaging/group_polling/create_poll/?dpr=1", data ) if j.get("status") != "success": raise _exception.ExternalError( "Failed creating poll: {}".format(j.get("errorTitle")), j.get("errorMessage"), )
[docs] def mute(self, duration: datetime.timedelta = None): """Mute the thread. Args: duration: Time to mute, use ``None`` to mute forever Example: >>> import datetime >>> thread.mute(datetime.timedelta(days=2)) """ if duration is None: setting = "-1" else: setting = str(_util.timedelta_to_seconds(duration)) data = {"mute_settings": setting, "thread_fbid": self.id} j = self.session._payload_post( "/ajax/mercury/change_mute_thread.php?dpr=1", data )
[docs] def unmute(self): """Unmute the thread. Example: >>> thread.unmute() """ return self.mute(datetime.timedelta(0))
def _mute_reactions(self, mode: bool): data = {"reactions_mute_mode": "1" if mode else "0", "thread_fbid": self.id} j = self.session._payload_post( "/ajax/mercury/change_reactions_mute_thread/?dpr=1", data )
[docs] def mute_reactions(self): """Mute thread reactions.""" self._mute_reactions(True)
[docs] def unmute_reactions(self): """Unmute thread reactions.""" self._mute_reactions(False)
def _mute_mentions(self, mode: bool): data = {"mentions_mute_mode": "1" if mode else "0", "thread_fbid": self.id} j = self.session._payload_post( "/ajax/mercury/change_mentions_mute_thread/?dpr=1", data )
[docs] def mute_mentions(self): """Mute thread mentions.""" self._mute_mentions(True)
[docs] def unmute_mentions(self): """Unmute thread mentions.""" self._mute_mentions(False)
[docs] def mark_as_spam(self): """Mark the thread as spam, and delete it.""" data = {"id": self.id} j = self.session._payload_post("/ajax/mercury/mark_spam.php?dpr=1", data)
@staticmethod def _delete_many(session, thread_ids): data = {} for i, id_ in enumerate(thread_ids): data["ids[{}]".format(i)] = id_ # Not needed any more # j = session._payload_post("/ajax/mercury/change_pinned_status.php?dpr=1", ...) # Both /ajax/mercury/delete_threads.php (with an s) doesn't work j = session._payload_post("/ajax/mercury/delete_thread.php", data)
[docs] def delete(self): """Delete the thread. If you want to delete multiple threads, please use `Client.delete_threads`. Example: >>> message.delete() """ self._delete_many(self.session, [self.id])
def _forced_fetch(self, message_id: str) -> dict: params = { "thread_and_message_id": {"thread_id": self.id, "message_id": message_id} } (j,) = self.session._graphql_requests( _graphql.from_doc_id("1768656253222505", params) ) return j @staticmethod def _parse_color(inp: Optional[str]) -> str: if not inp: return DEFAULT_COLOR # Strip the alpha value, and lower the string return "#{}".format(inp[2:].lower()) @staticmethod def _parse_customization_info(data: Any) -> MutableMapping[str, Any]: if not data or not data.get("customization_info"): return {"emoji": None, "color": DEFAULT_COLOR} info = data["customization_info"] rtn = { "emoji": info.get("emoji"), "color": ThreadABC._parse_color(info.get("outgoing_bubble_color")), } if ( data.get("thread_type") == "GROUP" or data.get("is_group_thread") or data.get("thread_key", {}).get("thread_fbid") ): rtn["nicknames"] = {} for k in info.get("participant_customizations", []): rtn["nicknames"][k["participant_id"]] = k.get("nickname") elif info.get("participant_customizations"): user_id = data.get("thread_key", {}).get("other_user_id") or data.get("id") pc = info["participant_customizations"] if len(pc) > 0: if pc[0].get("participant_id") == user_id: rtn["nickname"] = pc[0].get("nickname") else: rtn["own_nickname"] = pc[0].get("nickname") if len(pc) > 1: if pc[1].get("participant_id") == user_id: rtn["nickname"] = pc[1].get("nickname") else: rtn["own_nickname"] = pc[1].get("nickname") return rtn @staticmethod def _parse_participants(session, data) -> Iterable["ThreadABC"]: from . import _user, _group, _page for node in data["nodes"]: actor = node["messaging_actor"] typename = actor["__typename"] thread_id = actor["id"] if typename == "User": yield _user.User(session=session, id=thread_id) elif typename == "MessageThread": # MessageThread => Group thread yield _group.Group(session=session, id=thread_id) elif typename == "Page": yield _page.Page(session=session, id=thread_id) elif typename == "Group": # We don't handle Facebook "Groups" pass else: log.warning("Unknown type %r in %s", typename, data)
[docs]@attrs_default class Thread(ThreadABC): """Represents a Facebook thread, where the actual type is unknown. Implements parts of `ThreadABC`, call the method to figure out if your use case is supported. Otherwise, you'll have to use an `User`/`Group`/`Page` object. Note: This list may change in minor versions! """ #: The session to use when making requests. session = attr.ib(type=_session.Session) #: The unique identifier of the thread. id = attr.ib(converter=str, type=str) def _to_send_data(self): raise NotImplementedError( "The method you called is not supported on raw Thread objects." " Please use an appropriate User/Group/Page object instead!" ) def _copy(self) -> "Thread": return Thread(session=self.session, id=self.id)