import attr
import datetime
import requests
import random
import re
import json
# TODO: Only import when required
# Or maybe just replace usage with `html.parser`?
import bs4
from ._common import log, kw_only
from . import _graphql, _util, _exception
from typing import Optional, Mapping, Callable, Any
SERVER_JS_DEFINE_REGEX = re.compile(r'require\("ServerJSDefine"\)\)?\.handleDefines\(')
SERVER_JS_DEFINE_JSON_DECODER = json.JSONDecoder()
def parse_server_js_define(html: str) -> Mapping[str, Any]:
"""Parse ``ServerJSDefine`` entries from a HTML document."""
# Find points where we should start parsing
define_splits = SERVER_JS_DEFINE_REGEX.split(html)
# Skip leading entry
_, *define_splits = define_splits
rtn = []
if not define_splits:
raise _exception.ParseError("Could not find any ServerJSDefine", data=html)
if len(define_splits) < 2:
raise _exception.ParseError("Could not find enough ServerJSDefine", data=html)
if len(define_splits) > 2:
raise _exception.ParseError("Found too many ServerJSDefine", data=define_splits)
# Parse entries (should be two)
for entry in define_splits:
try:
parsed, _ = SERVER_JS_DEFINE_JSON_DECODER.raw_decode(entry, idx=0)
except json.JSONDecodeError as e:
raise _exception.ParseError("Invalid ServerJSDefine", data=entry) from e
if not isinstance(parsed, list):
raise _exception.ParseError("Invalid ServerJSDefine", data=parsed)
rtn.extend(parsed)
# Convert to a dict
return _util.get_jsmods_define(rtn)
def base36encode(number: int) -> str:
"""Convert from Base10 to Base36."""
# Taken from https://en.wikipedia.org/wiki/Base36#Python_implementation
chars = "0123456789abcdefghijklmnopqrstuvwxyz"
sign = "-" if number < 0 else ""
number = abs(number)
result = ""
while number > 0:
number, remainder = divmod(number, 36)
result = chars[remainder] + result
return sign + result
def prefix_url(url: str) -> str:
if url.startswith("/"):
return "https://www.messenger.com" + url
return url
def generate_message_id(now: datetime.datetime, client_id: str) -> str:
k = _util.datetime_to_millis(now)
l = int(random.random() * 4294967295)
return "<{}:{}-{}@mail.projektitan.com>".format(k, l, client_id)
def get_user_id(session: requests.Session) -> str:
# TODO: Optimize this `.get_dict()` call!
cookies = session.cookies.get_dict()
rtn = cookies.get("c_user")
if rtn is None:
raise _exception.ParseError("Could not find user id", data=cookies)
return str(rtn)
def session_factory() -> requests.Session:
from . import __version__
session = requests.session()
session.headers["Referer"] = "https://www.messenger.com/"
# We won't try to set a fake user agent to mask our presence!
# Facebook allows us access anyhow, and it makes our motives clearer:
# We're not trying to cheat Facebook, we simply want to access their service
session.headers["User-Agent"] = "fbchat/{}".format(__version__)
return session
def client_id_factory() -> str:
return hex(int(random.random() * 2 ** 31))[2:]
def find_form_request(html: str):
soup = bs4.BeautifulSoup(html, "html.parser", parse_only=bs4.SoupStrainer("form"))
form = soup.form
if not form:
raise _exception.ParseError("Could not find form to submit", data=soup)
url = form.get("action")
if not url:
raise _exception.ParseError("Could not find url to submit to", data=form)
# From what I've seen, it'll always do this!
if url.startswith("/"):
url = "https://www.facebook.com" + url
# It's okay to set missing values to something crap, the values are localized, and
# hence are not available in the raw HTML
data = {
x["name"]: x.get("value", "[missing]")
for x in form.find_all(["input", "button"])
}
return url, data
def two_factor_helper(session: requests.Session, r, on_2fa_callback):
url, data = find_form_request(r.content.decode("utf-8"))
# You don't have to type a code if your device is already saved
# Repeats if you get the code wrong
while "approvals_code" not in data:
data["approvals_code"] = on_2fa_callback()
log.info("Submitting 2FA code")
r = session.post(url, data=data, allow_redirects=False)
url, data = find_form_request(r.content.decode("utf-8"))
# TODO: Can be missing if checkup flow was done on another device in the meantime?
if "name_action_selected" in data:
data["name_action_selected"] = "save_device"
log.info("Saving browser")
r = session.post(url, data=data, allow_redirects=False)
url, data = find_form_request(r.content.decode("utf-8"))
log.info("Starting Facebook checkup flow")
r = session.post(url, data=data, allow_redirects=False)
url, data = find_form_request(r.content.decode("utf-8"))
if "submit[This was me]" not in data or "submit[This wasn't me]" not in data:
raise _exception.ParseError("Could not fill out form properly (2)", data=data)
data["submit[This was me]"] = "[any value]"
del data["submit[This wasn't me]"]
log.info("Verifying login attempt")
r = session.post(url, data=data, allow_redirects=False)
url, data = find_form_request(r.content.decode("utf-8"))
if "name_action_selected" not in data:
raise _exception.ParseError("Could not fill out form properly (3)", data=data)
data["name_action_selected"] = "save_device"
log.info("Saving device again")
r = session.post(url, data=data, allow_redirects=False)
print(r.status_code, r.url, r.headers)
return r.headers.get("Location")
def get_error_data(html: str) -> Optional[str]:
"""Get error message from a request."""
soup = bs4.BeautifulSoup(
html, "html.parser", parse_only=bs4.SoupStrainer("form", id="login_form")
)
# Attempt to extract and format the error string
# The error message is in the user's own language!
return " ".join(list(soup.stripped_strings)[1:3]) or None
def get_fb_dtsg(define) -> Optional[str]:
if "DTSGInitData" in define:
return define["DTSGInitData"]["token"]
elif "DTSGInitialData" in define:
return define["DTSGInitialData"]["token"]
return None
[docs]@attr.s(slots=True, kw_only=kw_only, repr=False, eq=False)
class Session:
"""Stores and manages state required for most Facebook requests.
This is the main class, which is used to login to Facebook.
"""
_user_id = attr.ib(type=str)
_fb_dtsg = attr.ib(type=str)
_revision = attr.ib(type=int)
_session = attr.ib(factory=session_factory, type=requests.Session)
_counter = attr.ib(0, type=int)
_client_id = attr.ib(factory=client_id_factory, type=str)
@property
def user(self):
"""The logged in user."""
from . import _threads
# TODO: Consider caching the result
return _threads.User(session=self, id=self._user_id)
def __repr__(self) -> str:
# An alternative repr, to illustrate that you can't create the class directly
return "<fbchat.Session user_id={}>".format(self._user_id)
def _get_params(self):
self._counter += 1 # TODO: Make this operation atomic / thread-safe
return {
"__a": 1,
"__req": base36encode(self._counter),
"__rev": self._revision,
"fb_dtsg": self._fb_dtsg,
}
# TODO: Add ability to load previous cookies in here, to avoid 2fa flow
[docs] @classmethod
def login(
cls, email: str, password: str, on_2fa_callback: Callable[[], int] = None
):
"""Login the user, using ``email`` and ``password``.
Args:
email: Facebook ``email``, ``id`` or ``phone number``
password: Facebook account password
on_2fa_callback: Function that will be called, in case a two factor
authentication code is needed. This should return the requested code.
Only tested with SMS codes, might not work with authentication apps.
Note: Facebook limits the amount of codes they will give you, so if you
don't receive a code, be patient, and try again later!
Example:
>>> import fbchat
>>> import getpass
>>> session = fbchat.Session.login(
... input("Email: "),
... getpass.getpass(),
... on_2fa_callback=lambda: input("2FA Code: ")
... )
Email: abc@gmail.com
Password: ****
2FA Code: 123456
>>> session.user.id
"1234"
"""
session = session_factory()
data = {
# "jazoest": "2754",
# "lsd": "AVqqqRUa",
"initial_request_id": "x", # any, just has to be present
# "timezone": "-120",
# "lgndim": "eyJ3IjoxNDQwLCJoIjo5MDAsImF3IjoxNDQwLCJhaCI6ODc3LCJjIjoyNH0=",
# "lgnrnd": "044039_RGm9",
"lgnjs": "n",
"email": email,
"pass": password,
"login": "1",
"persistent": "1", # Changes the cookie type to have a long "expires"
"default_persistent": "0",
}
try:
# Should hit a redirect to https://www.messenger.com/
# If this does happen, the session is logged in!
r = session.post(
"https://www.messenger.com/login/password/",
data=data,
allow_redirects=False,
)
except requests.RequestException as e:
_exception.handle_requests_error(e)
_exception.handle_http_error(r.status_code)
url = r.headers.get("Location")
# We weren't redirected, hence the email or password was wrong
if not url:
error = get_error_data(r.content.decode("utf-8"))
raise _exception.NotLoggedIn(error)
if "checkpoint" in url:
if not on_2fa_callback:
raise _exception.NotLoggedIn(
"2FA code required! Please supply `on_2fa_callback` to .login"
)
# Get a facebook.com url that handles the 2FA flow
# This probably works differently for Messenger-only accounts
url = _util.get_url_parameter(url, "next")
# Explicitly allow redirects
r = session.get(url, allow_redirects=True)
url = two_factor_helper(session, r, on_2fa_callback)
if not url.startswith("https://www.messenger.com/login/auth_token/"):
raise _exception.ParseError("Failed 2fa flow", data=url)
r = session.get(url, allow_redirects=False)
url = r.headers.get("Location")
if url != "https://www.messenger.com/":
error = get_error_data(r.content.decode("utf-8"))
raise _exception.NotLoggedIn("Failed logging in: {}, {}".format(url, error))
try:
return cls._from_session(session=session)
except _exception.NotLoggedIn as e:
raise _exception.ParseError("Failed loading session", data=r) from e
[docs] def is_logged_in(self) -> bool:
"""Send a request to Facebook to check the login status.
Returns:
Whether the user is still logged in
Example:
>>> assert session.is_logged_in()
"""
# Send a request to the login url, to see if we're directed to the home page
try:
r = self._session.get(prefix_url("/login/"), allow_redirects=False)
except requests.RequestException as e:
_exception.handle_requests_error(e)
_exception.handle_http_error(r.status_code)
return "https://www.messenger.com/" == r.headers.get("Location")
[docs] def logout(self) -> None:
"""Safely log out the user.
The session object must not be used after this action has been performed!
Example:
>>> session.logout()
"""
data = {"fb_dtsg": self._fb_dtsg}
try:
r = self._session.post(
prefix_url("/logout/"), data=data, allow_redirects=False
)
except requests.RequestException as e:
_exception.handle_requests_error(e)
_exception.handle_http_error(r.status_code)
if "Location" not in r.headers:
raise _exception.FacebookError("Failed logging out, was not redirected!")
if "https://www.messenger.com/login/" != r.headers["Location"]:
raise _exception.FacebookError(
"Failed logging out, got bad redirect: {}".format(r.headers["Location"])
)
@classmethod
def _from_session(cls, session):
# TODO: Automatically set user_id when the cookie changes in the session
user_id = get_user_id(session)
# Make a request to the main page to retrieve ServerJSDefine entries
try:
r = session.get(prefix_url("/"), allow_redirects=False)
except requests.RequestException as e:
_exception.handle_requests_error(e)
_exception.handle_http_error(r.status_code)
define = parse_server_js_define(r.content.decode("utf-8"))
fb_dtsg = get_fb_dtsg(define)
if fb_dtsg is None:
raise _exception.ParseError("Could not find fb_dtsg", data=define)
if not fb_dtsg:
# Happens when the client is not actually logged in
raise _exception.NotLoggedIn(
"Found empty fb_dtsg, the session was probably invalid."
)
try:
revision = int(define["SiteData"]["client_revision"])
except TypeError:
raise _exception.ParseError("Could not find client revision", data=define)
return cls(user_id=user_id, fb_dtsg=fb_dtsg, revision=revision, session=session)
[docs] def get_cookies(self) -> Mapping[str, str]:
"""Retrieve session cookies, that can later be used in `from_cookies`.
Returns:
A dictionary containing session cookies
Example:
>>> cookies = session.get_cookies()
"""
return self._session.cookies.get_dict()
[docs] @classmethod
def from_cookies(cls, cookies: Mapping[str, str]):
"""Load a session from session cookies.
Args:
cookies: A dictionary containing session cookies
Example:
>>> cookies = session.get_cookies()
>>> # Store cookies somewhere, and then subsequently
>>> session = fbchat.Session.from_cookies(cookies)
"""
session = session_factory()
session.cookies = requests.cookies.merge_cookies(session.cookies, cookies)
return cls._from_session(session=session)
def _post(self, url, data, files=None, as_graphql=False):
data.update(self._get_params())
try:
r = self._session.post(prefix_url(url), data=data, files=files)
except requests.RequestException as e:
_exception.handle_requests_error(e)
# Facebook's encoding is always UTF-8
r.encoding = "utf-8"
_exception.handle_http_error(r.status_code)
if r.text is None or len(r.text) == 0:
raise _exception.HTTPError("Error when sending request: Got empty response")
if as_graphql:
return _graphql.response_to_json(r.text)
else:
text = _util.strip_json_cruft(r.text)
j = _util.parse_json(text)
log.debug(j)
return j
def _payload_post(self, url, data, files=None):
j = self._post(url, data, files=files)
_exception.handle_payload_error(j)
# update fb_dtsg token if received in response
if "jsmods" in j:
define = _util.get_jsmods_define(j["jsmods"]["define"])
fb_dtsg = get_fb_dtsg(define)
if fb_dtsg:
self._fb_dtsg = fb_dtsg
try:
return j["payload"]
except (KeyError, TypeError) as e:
raise _exception.ParseError("Missing payload", data=j) from e
def _graphql_requests(self, *queries):
# TODO: Explain usage of GraphQL, probably in the docs
# Perhaps provide this API as public?
data = {
"method": "GET",
"response_format": "json",
"queries": _graphql.queries_to_json(*queries),
}
return self._post("/api/graphqlbatch/", data, as_graphql=True)
def _do_send_request(self, data):
now = datetime.datetime.utcnow()
offline_threading_id = _util.generate_offline_threading_id()
data["client"] = "mercury"
data["author"] = "fbid:{}".format(self._user_id)
data["timestamp"] = _util.datetime_to_millis(now)
data["source"] = "source:chat:web"
data["offline_threading_id"] = offline_threading_id
data["message_id"] = offline_threading_id
data["threading_id"] = generate_message_id(now, self._client_id)
data["ephemeral_ttl_mode:"] = "0"
j = self._post("/messaging/send/", data)
_exception.handle_payload_error(j)
try:
message_ids = [
(action["message_id"], action["thread_fbid"])
for action in j["payload"]["actions"]
if "message_id" in action
]
if len(message_ids) != 1:
log.warning("Got multiple message ids' back: {}".format(message_ids))
return message_ids[0]
except (KeyError, IndexError, TypeError) as e:
raise _exception.ParseError("No message IDs could be found", data=j) from e