import attr
import datetime
import enum
from string import Formatter
from . import _attachment, _location, _file, _quick_reply, _sticker
from .._common import log, attrs_default
from .. import _exception, _util
from typing import Optional, Mapping, Sequence, Any
[docs]class EmojiSize(enum.Enum):
"""Used to specify the size of a sent emoji."""
LARGE = "369239383222810"
MEDIUM = "369239343222814"
SMALL = "369239263222822"
@classmethod
def _from_tags(cls, tags):
string_to_emojisize = {
"large": cls.LARGE,
"medium": cls.MEDIUM,
"small": cls.SMALL,
"l": cls.LARGE,
"m": cls.MEDIUM,
"s": cls.SMALL,
}
for tag in tags or ():
data = tag.split(":", 1)
if len(data) > 1 and data[0] == "hot_emoji_size":
return string_to_emojisize.get(data[1])
return None
[docs]@attrs_default
class Mention:
"""Represents a ``@mention``.
>>> fbchat.Mention(thread_id="1234", offset=5, length=2)
Mention(thread_id="1234", offset=5, length=2)
"""
#: The thread ID the mention is pointing at
thread_id = attr.ib(type=str)
#: The character where the mention starts
offset = attr.ib(type=int)
#: The length of the mention
length = attr.ib(type=int)
@classmethod
def _from_range(cls, data):
# TODO: Parse data["entity"]["__typename"]
return cls(
# Can be missing
thread_id=data["entity"].get("id"),
offset=data["offset"],
length=data["length"],
)
@classmethod
def _from_prng(cls, data):
return cls(thread_id=data["i"], offset=data["o"], length=data["l"])
def _to_send_data(self, i):
return {
"profile_xmd[{}][id]".format(i): self.thread_id,
"profile_xmd[{}][offset]".format(i): self.offset,
"profile_xmd[{}][length]".format(i): self.length,
"profile_xmd[{}][type]".format(i): "p",
}
# Exaustively searched for options by using the list in:
# https://unicode.org/emoji/charts/full-emoji-list.html
SENDABLE_REACTIONS = ("❤", "😍", "😆", "😮", "😢", "😠", "👍", "👎")
[docs]@attrs_default
class Message:
"""Represents a Facebook message.
Example:
>>> thread = fbchat.User(session=session, id="1234")
>>> message = fbchat.Message(thread=thread, id="mid.$XYZ")
"""
#: The thread that this message belongs to.
thread = attr.ib()
#: The message ID.
id = attr.ib(converter=str, type=str)
@property
def session(self):
"""The session to use when making requests."""
return self.thread.session
@staticmethod
def _delete_many(session, message_ids):
data = {}
for i, id_ in enumerate(message_ids):
data["message_ids[{}]".format(i)] = id_
j = session._payload_post("/ajax/mercury/delete_messages.php?dpr=1", data)
[docs] def delete(self):
"""Delete the message (removes it only for the user).
If you want to delete multiple messages, please use `Client.delete_messages`.
Example:
>>> message.delete()
"""
self._delete_many(self.session, [self.id])
[docs] def unsend(self):
"""Unsend the message (removes it for everyone).
The message must to be sent by you, and less than 10 minutes ago.
Example:
>>> message.unsend()
"""
data = {"message_id": self.id}
j = self.session._payload_post("/messaging/unsend_message/?dpr=1", data)
[docs] def react(self, reaction: Optional[str]):
"""React to the message, or removes reaction.
Currently, you can use "❤", "😍", "😆", "😮", "😢", "😠", "👍" or "👎". It
should be possible to add support for more, but we haven't figured that out yet.
Args:
reaction: Reaction emoji to use, or if ``None``, removes reaction.
Example:
>>> message.react("😍")
"""
if reaction and reaction not in SENDABLE_REACTIONS:
raise ValueError(
"Invalid reaction! Please use one of: {}".format(SENDABLE_REACTIONS)
)
data = {
"action": "ADD_REACTION" if reaction else "REMOVE_REACTION",
"client_mutation_id": "1",
"actor_id": self.session.user.id,
"message_id": self.id,
"reaction": reaction,
}
data = {
"doc_id": 1491398900900362,
"variables": _util.json_minimal({"data": data}),
}
j = self.session._payload_post("/webgraphql/mutation", data)
_exception.handle_graphql_errors(j)
[docs] def fetch(self) -> "MessageData":
"""Fetch fresh `MessageData` object.
Example:
>>> message = message.fetch()
>>> message.text
"The message text"
"""
message_info = self.thread._forced_fetch(self.id).get("message")
return MessageData._from_graphql(self.thread, message_info)
@attrs_default
class MessageSnippet(Message):
"""Represents data in a Facebook message snippet.
Inherits `Message`.
"""
#: ID of the sender
author = attr.ib(type=str)
#: When the message was sent
created_at = attr.ib(type=datetime.datetime)
#: The actual message
text = attr.ib(type=str)
#: A dict with offsets, mapped to the matched text
matched_keywords = attr.ib(type=Mapping[int, str])
@classmethod
def _parse(cls, thread, data):
return cls(
thread=thread,
id=data["message_id"],
author=data["author"].rstrip("fbid:"),
created_at=_util.millis_to_datetime(data["timestamp"]),
text=data["body"],
matched_keywords={int(k): v for k, v in data["matched_keywords"].items()},
)
[docs]@attrs_default
class MessageData(Message):
"""Represents data in a Facebook message.
Inherits `Message`.
"""
#: ID of the sender
author = attr.ib(type=str)
#: When the message was sent
created_at = attr.ib(type=datetime.datetime)
#: The actual message
text = attr.ib(None, type=Optional[str])
#: A list of `Mention` objects
mentions = attr.ib(factory=list, type=Sequence[Mention])
#: Size of a sent emoji
emoji_size = attr.ib(None, type=Optional[EmojiSize])
#: Whether the message is read
is_read = attr.ib(None, type=Optional[bool])
#: People IDs who read the message, only works with `ThreadABC.fetch_messages`
read_by = attr.ib(factory=list, type=bool)
#: A dictionary with user's IDs as keys, and their reaction as values
reactions = attr.ib(factory=dict, type=Mapping[str, str])
#: A `Sticker`
sticker = attr.ib(None, type=Optional[_sticker.Sticker])
#: A list of attachments
attachments = attr.ib(factory=list, type=Sequence[_attachment.Attachment])
#: A list of `QuickReply`
quick_replies = attr.ib(factory=list, type=Sequence[_quick_reply.QuickReply])
#: Whether the message is unsent (deleted for everyone)
unsent = attr.ib(False, type=Optional[bool])
#: Message ID you want to reply to
reply_to_id = attr.ib(None, type=Optional[str])
#: Replied message
replied_to = attr.ib(None, type=Optional[Any])
#: Whether the message was forwarded
forwarded = attr.ib(False, type=Optional[bool])
@staticmethod
def _get_forwarded_from_tags(tags):
if tags is None:
return False
return any(map(lambda tag: "forward" in tag or "copy" in tag, tags))
@staticmethod
def _parse_quick_replies(data):
if data:
data = _util.parse_json(data).get("quick_replies")
if isinstance(data, list):
return [_quick_reply.graphql_to_quick_reply(q) for q in data]
elif isinstance(data, dict):
return [_quick_reply.graphql_to_quick_reply(data, is_response=True)]
return []
@classmethod
def _from_graphql(cls, thread, data, read_receipts=None):
if data.get("message_sender") is None:
data["message_sender"] = {}
if data.get("message") is None:
data["message"] = {}
tags = data.get("tags_list")
created_at = _util.millis_to_datetime(int(data.get("timestamp_precise")))
attachments = [
_file.graphql_to_attachment(attachment)
for attachment in data.get("blob_attachments") or ()
]
unsent = False
if data.get("extensible_attachment") is not None:
attachment = graphql_to_extensible_attachment(data["extensible_attachment"])
if isinstance(attachment, _attachment.UnsentMessage):
unsent = True
elif attachment:
attachments.append(attachment)
replied_to = None
if data.get("replied_to_message") and data["replied_to_message"]["message"]:
# data["replied_to_message"]["message"] is None if the message is deleted
replied_to = cls._from_graphql(
thread, data["replied_to_message"]["message"]
)
return cls(
thread=thread,
id=str(data["message_id"]),
author=str(data["message_sender"]["id"]),
created_at=created_at,
text=data["message"].get("text"),
mentions=[
Mention._from_range(m) for m in data["message"].get("ranges") or ()
],
emoji_size=EmojiSize._from_tags(tags),
is_read=not data["unread"] if data.get("unread") is not None else None,
read_by=[
receipt["actor"]["id"]
for receipt in read_receipts or ()
if _util.millis_to_datetime(int(receipt["watermark"])) >= created_at
],
reactions={
str(r["user"]["id"]): r["reaction"] for r in data["message_reactions"]
},
sticker=_sticker.Sticker._from_graphql(data.get("sticker")),
attachments=attachments,
quick_replies=cls._parse_quick_replies(data.get("platform_xmd_encoded")),
unsent=unsent,
reply_to_id=replied_to.id if replied_to else None,
replied_to=replied_to,
forwarded=cls._get_forwarded_from_tags(tags),
)
@classmethod
def _from_reply(cls, thread, data):
tags = data["messageMetadata"].get("tags")
metadata = data.get("messageMetadata", {})
attachments = []
unsent = False
sticker = None
for attachment in data.get("attachments") or ():
attachment = _util.parse_json(attachment["mercuryJSON"])
if attachment.get("blob_attachment"):
attachments.append(
_file.graphql_to_attachment(attachment["blob_attachment"])
)
if attachment.get("extensible_attachment"):
extensible_attachment = graphql_to_extensible_attachment(
attachment["extensible_attachment"]
)
if isinstance(extensible_attachment, _attachment.UnsentMessage):
unsent = True
else:
attachments.append(extensible_attachment)
if attachment.get("sticker_attachment"):
sticker = _sticker.Sticker._from_graphql(
attachment["sticker_attachment"]
)
return cls(
thread=thread,
id=metadata.get("messageId"),
author=str(metadata["actorFbId"]),
created_at=_util.millis_to_datetime(metadata["timestamp"]),
text=data.get("body"),
mentions=[
Mention._from_prng(m)
for m in _util.parse_json(data.get("data", {}).get("prng", "[]"))
],
emoji_size=EmojiSize._from_tags(tags),
sticker=sticker,
attachments=attachments,
quick_replies=cls._parse_quick_replies(data.get("platform_xmd_encoded")),
unsent=unsent,
reply_to_id=data["messageReply"]["replyToMessageId"]["id"]
if "messageReply" in data
else None,
forwarded=cls._get_forwarded_from_tags(tags),
)
@classmethod
def _from_pull(cls, thread, data, author, created_at):
metadata = data["messageMetadata"]
tags = metadata.get("tags")
mentions = []
if data.get("data") and data["data"].get("prng"):
try:
mentions = [
Mention._from_prng(m)
for m in _util.parse_json(data["data"]["prng"])
]
except Exception:
log.exception("An exception occured while reading attachments")
attachments = []
unsent = False
sticker = None
try:
for a in data.get("attachments") or ():
mercury = a["mercury"]
if mercury.get("blob_attachment"):
image_metadata = a.get("imageMetadata", {})
attach_type = mercury["blob_attachment"]["__typename"]
attachment = _file.graphql_to_attachment(
mercury["blob_attachment"], a.get("fileSize")
)
attachments.append(attachment)
elif mercury.get("sticker_attachment"):
sticker = _sticker.Sticker._from_graphql(
mercury["sticker_attachment"]
)
elif mercury.get("extensible_attachment"):
attachment = graphql_to_extensible_attachment(
mercury["extensible_attachment"]
)
if isinstance(attachment, _attachment.UnsentMessage):
unsent = True
elif attachment:
attachments.append(attachment)
except Exception:
log.exception(
"An exception occured while reading attachments: {}".format(
data["attachments"]
)
)
return cls(
thread=thread,
id=metadata["messageId"],
author=author,
created_at=created_at,
text=data.get("body"),
mentions=mentions,
emoji_size=EmojiSize._from_tags(tags),
sticker=sticker,
attachments=attachments,
unsent=unsent,
forwarded=cls._get_forwarded_from_tags(tags),
)
def graphql_to_extensible_attachment(data):
story = data.get("story_attachment")
if not story:
return None
target = story.get("target")
if not target:
return _attachment.UnsentMessage(id=data.get("legacy_attachment_id"))
_type = target["__typename"]
if _type == "MessageLocation":
return _location.LocationAttachment._from_graphql(story)
elif _type == "MessageLiveLocation":
return _location.LiveLocationAttachment._from_graphql(story)
elif _type in ["ExternalUrl", "Story"]:
return _attachment.ShareAttachment._from_graphql(story)
return None