Source code for fbchat._session

import attr
import datetime
import requests
import random
import re
import json

# TODO: Only import when required
# Or maybe just replace usage with `html.parser`?
import bs4

from ._common import log, kw_only
from . import _graphql, _util, _exception

from typing import Optional, Mapping, Callable, Any


SERVER_JS_DEFINE_REGEX = re.compile(
    r'require(?:\("ServerJS"\).{,100}\.handle\({.*"define":)|(?:\("ServerJSDefine"\)\)?\.handleDefines\()'
)
SERVER_JS_DEFINE_JSON_DECODER = json.JSONDecoder()


def parse_server_js_define(html: str) -> Mapping[str, Any]:
    """Parse ``ServerJSDefine`` entries from a HTML document."""
    # Find points where we should start parsing
    define_splits = SERVER_JS_DEFINE_REGEX.split(html)

    # TODO: Extract jsmods "require" and "define" from `bigPipe.onPageletArrive`?

    # Skip leading entry
    _, *define_splits = define_splits

    rtn = []
    if not define_splits:
        raise _exception.ParseError("Could not find any ServerJSDefine", data=html)
    if len(define_splits) < 2:
        raise _exception.ParseError("Could not find enough ServerJSDefine", data=html)
    if len(define_splits) > 2:
        raise _exception.ParseError("Found too many ServerJSDefine", data=define_splits)
    # Parse entries (should be two)
    for entry in define_splits:
        try:
            parsed, _ = SERVER_JS_DEFINE_JSON_DECODER.raw_decode(entry, idx=0)
        except json.JSONDecodeError as e:
            raise _exception.ParseError("Invalid ServerJSDefine", data=entry) from e
        if not isinstance(parsed, list):
            raise _exception.ParseError("Invalid ServerJSDefine", data=parsed)
        rtn.extend(parsed)

    # Convert to a dict
    return _util.get_jsmods_define(rtn)


def base36encode(number: int) -> str:
    """Convert from Base10 to Base36."""
    # Taken from https://en.wikipedia.org/wiki/Base36#Python_implementation
    chars = "0123456789abcdefghijklmnopqrstuvwxyz"

    sign = "-" if number < 0 else ""
    number = abs(number)
    result = ""

    while number > 0:
        number, remainder = divmod(number, 36)
        result = chars[remainder] + result

    return sign + result


def prefix_url(url: str) -> str:
    if url.startswith("/"):
        return "https://www.messenger.com" + url
    return url


def generate_message_id(now: datetime.datetime, client_id: str) -> str:
    k = _util.datetime_to_millis(now)
    l = int(random.random() * 4294967295)
    return "<{}:{}-{}@mail.projektitan.com>".format(k, l, client_id)


def get_user_id(session: requests.Session) -> str:
    # TODO: Optimize this `.get_dict()` call!
    cookies = session.cookies.get_dict()
    rtn = cookies.get("c_user")
    if rtn is None:
        raise _exception.ParseError("Could not find user id", data=cookies)
    return str(rtn)


def session_factory() -> requests.Session:
    from . import __version__

    session = requests.session()
    session.headers["Referer"] = "https://www.messenger.com/"
    # We won't try to set a fake user agent to mask our presence!
    # Facebook allows us access anyhow, and it makes our motives clearer:
    # We're not trying to cheat Facebook, we simply want to access their service
    session.headers["User-Agent"] = "fbchat/{}".format(__version__)
    return session


def client_id_factory() -> str:
    return hex(int(random.random() * 2 ** 31))[2:]


def find_form_request(html: str):
    soup = bs4.BeautifulSoup(html, "html.parser", parse_only=bs4.SoupStrainer("form"))

    form = soup.form
    if not form:
        raise _exception.ParseError("Could not find form to submit", data=html)

    url = form.get("action")
    if not url:
        raise _exception.ParseError("Could not find url to submit to", data=form)

    # From what I've seen, it'll always do this!
    if url.startswith("/"):
        url = "https://www.facebook.com" + url

    # It's okay to set missing values to something crap, the values are localized, and
    # hence are not available in the raw HTML
    data = {
        x["name"]: x.get("value", "[missing]")
        for x in form.find_all(["input", "button"])
    }
    return url, data


def two_factor_helper(session: requests.Session, r, on_2fa_callback):
    url, data = find_form_request(r.content.decode("utf-8"))

    # You don't have to type a code if your device is already saved
    # Repeats if you get the code wrong
    while "approvals_code" in data:
        data["approvals_code"] = on_2fa_callback()
        log.info("Submitting 2FA code")
        r = session.post(url, data=data, allow_redirects=False)
        log.debug("2FA location: %s", r.headers.get("Location"))
        url, data = find_form_request(r.content.decode("utf-8"))

    # TODO: Can be missing if checkup flow was done on another device in the meantime?
    if "name_action_selected" in data:
        data["name_action_selected"] = "save_device"
        log.info("Saving browser")
        r = session.post(url, data=data, allow_redirects=False)
        log.debug("2FA location: %s", r.headers.get("Location"))
        url = r.headers.get("Location")
        if url and url.startswith("https://www.messenger.com/login/auth_token/"):
            return url
        url, data = find_form_request(r.content.decode("utf-8"))

    log.info("Starting Facebook checkup flow")
    r = session.post(url, data=data, allow_redirects=False)
    log.debug("2FA location: %s", r.headers.get("Location"))

    url, data = find_form_request(r.content.decode("utf-8"))
    if "verification_method" in data:
        raise _exception.NotLoggedIn(
            "Your account is locked, and you need to log in using a browser, and verify it there!"
        )
    if "submit[This was me]" not in data or "submit[This wasn't me]" not in data:
        raise _exception.ParseError("Could not fill out form properly (2)", data=data)
    data["submit[This was me]"] = "[any value]"
    del data["submit[This wasn't me]"]
    log.info("Verifying login attempt")
    r = session.post(url, data=data, allow_redirects=False)
    log.debug("2FA location: %s", r.headers.get("Location"))

    url, data = find_form_request(r.content.decode("utf-8"))
    if "name_action_selected" not in data:
        raise _exception.ParseError("Could not fill out form properly (3)", data=data)
    data["name_action_selected"] = "save_device"
    log.info("Saving device again")
    r = session.post(url, data=data, allow_redirects=False)
    log.debug("2FA location: %s", r.headers.get("Location"))
    return r.headers.get("Location")


def get_error_data(html: str) -> Optional[str]:
    """Get error message from a request."""
    soup = bs4.BeautifulSoup(
        html, "html.parser", parse_only=bs4.SoupStrainer("form", id="login_form")
    )
    # Attempt to extract and format the error string
    # The error message is in the user's own language!
    return " ".join(list(soup.stripped_strings)[1:3]) or None


def get_fb_dtsg(define) -> Optional[str]:
    if "DTSGInitData" in define:
        return define["DTSGInitData"]["token"]
    elif "DTSGInitialData" in define:
        return define["DTSGInitialData"]["token"]
    return None


[docs]@attr.s(slots=True, kw_only=kw_only, repr=False, eq=False) class Session: """Stores and manages state required for most Facebook requests. This is the main class, which is used to login to Facebook. """ _user_id = attr.ib(type=str) _fb_dtsg = attr.ib(type=str) _revision = attr.ib(type=int) _session = attr.ib(factory=session_factory, type=requests.Session) _counter = attr.ib(0, type=int) _client_id = attr.ib(factory=client_id_factory, type=str) @property def user(self): """The logged in user.""" from . import _threads # TODO: Consider caching the result return _threads.User(session=self, id=self._user_id) def __repr__(self) -> str: # An alternative repr, to illustrate that you can't create the class directly return "<fbchat.Session user_id={}>".format(self._user_id) def _get_params(self): self._counter += 1 # TODO: Make this operation atomic / thread-safe return { "__a": 1, "__req": base36encode(self._counter), "__rev": self._revision, "fb_dtsg": self._fb_dtsg, } # TODO: Add ability to load previous cookies in here, to avoid 2fa flow
[docs] @classmethod def login( cls, email: str, password: str, on_2fa_callback: Callable[[], int] = None ): """Login the user, using ``email`` and ``password``. Args: email: Facebook ``email``, ``id`` or ``phone number`` password: Facebook account password on_2fa_callback: Function that will be called, in case a two factor authentication code is needed. This should return the requested code. Tested using SMS and authentication applications. If you have both enabled, you might not receive an SMS code, and you'll have to use the authentication application. Note: Facebook limits the amount of codes they will give you, so if you don't receive a code, be patient, and try again later! Example: >>> import fbchat >>> import getpass >>> session = fbchat.Session.login( ... input("Email: "), ... getpass.getpass(), ... on_2fa_callback=lambda: input("2FA Code: ") ... ) Email: abc@gmail.com Password: **** 2FA Code: 123456 >>> session.user.id "1234" """ session = session_factory() data = { # "jazoest": "2754", # "lsd": "AVqqqRUa", "initial_request_id": "x", # any, just has to be present # "timezone": "-120", # "lgndim": "eyJ3IjoxNDQwLCJoIjo5MDAsImF3IjoxNDQwLCJhaCI6ODc3LCJjIjoyNH0=", # "lgnrnd": "044039_RGm9", "lgnjs": "n", "email": email, "pass": password, "login": "1", "persistent": "1", # Changes the cookie type to have a long "expires" "default_persistent": "0", } try: # Should hit a redirect to https://www.messenger.com/ # If this does happen, the session is logged in! r = session.post( "https://www.messenger.com/login/password/", data=data, allow_redirects=False, ) except requests.RequestException as e: _exception.handle_requests_error(e) _exception.handle_http_error(r.status_code) url = r.headers.get("Location") # We weren't redirected, hence the email or password was wrong if not url: error = get_error_data(r.content.decode("utf-8")) raise _exception.NotLoggedIn(error) if "checkpoint" in url: if not on_2fa_callback: raise _exception.NotLoggedIn( "2FA code required! Please supply `on_2fa_callback` to .login" ) # Get a facebook.com/checkpoint/start url that handles the 2FA flow # This probably works differently for Messenger-only accounts url = _util.get_url_parameter(url, "next") if not url.startswith("https://www.facebook.com/checkpoint/start/"): raise _exception.ParseError("Failed 2fa flow (1)", data=url) r = session.get(url, allow_redirects=False) url = r.headers.get("Location") if not url or not url.startswith("https://www.facebook.com/checkpoint/"): raise _exception.ParseError("Failed 2fa flow (2)", data=url) r = session.get(url, allow_redirects=False) url = two_factor_helper(session, r, on_2fa_callback) if not url.startswith("https://www.messenger.com/login/auth_token/"): raise _exception.ParseError("Failed 2fa flow (3)", data=url) r = session.get(url, allow_redirects=False) url = r.headers.get("Location") if url != "https://www.messenger.com/": error = get_error_data(r.content.decode("utf-8")) raise _exception.NotLoggedIn("Failed logging in: {}, {}".format(url, error)) try: return cls._from_session(session=session) except _exception.NotLoggedIn as e: raise _exception.ParseError("Failed loading session", data=r) from e
[docs] def is_logged_in(self) -> bool: """Send a request to Facebook to check the login status. Returns: Whether the user is still logged in Example: >>> assert session.is_logged_in() """ # Send a request to the login url, to see if we're directed to the home page try: r = self._session.get(prefix_url("/login/"), allow_redirects=False) except requests.RequestException as e: _exception.handle_requests_error(e) _exception.handle_http_error(r.status_code) return "https://www.messenger.com/" == r.headers.get("Location")
[docs] def logout(self) -> None: """Safely log out the user. The session object must not be used after this action has been performed! Example: >>> session.logout() """ data = {"fb_dtsg": self._fb_dtsg} try: r = self._session.post( prefix_url("/logout/"), data=data, allow_redirects=False ) except requests.RequestException as e: _exception.handle_requests_error(e) _exception.handle_http_error(r.status_code) if "Location" not in r.headers: raise _exception.FacebookError("Failed logging out, was not redirected!") if "https://www.messenger.com/login/" != r.headers["Location"]: raise _exception.FacebookError( "Failed logging out, got bad redirect: {}".format(r.headers["Location"]) )
@classmethod def _from_session(cls, session): # TODO: Automatically set user_id when the cookie changes in the session user_id = get_user_id(session) # Make a request to the main page to retrieve ServerJSDefine entries try: r = session.get(prefix_url("/"), allow_redirects=False) except requests.RequestException as e: _exception.handle_requests_error(e) _exception.handle_http_error(r.status_code) define = parse_server_js_define(r.content.decode("utf-8")) fb_dtsg = get_fb_dtsg(define) if fb_dtsg is None: raise _exception.ParseError("Could not find fb_dtsg", data=define) if not fb_dtsg: # Happens when the client is not actually logged in raise _exception.NotLoggedIn( "Found empty fb_dtsg, the session was probably invalid." ) try: revision = int(define["SiteData"]["client_revision"]) except TypeError: raise _exception.ParseError("Could not find client revision", data=define) return cls(user_id=user_id, fb_dtsg=fb_dtsg, revision=revision, session=session)
[docs] def get_cookies(self) -> Mapping[str, str]: """Retrieve session cookies, that can later be used in `from_cookies`. Returns: A dictionary containing session cookies Example: >>> cookies = session.get_cookies() """ return self._session.cookies.get_dict()
[docs] @classmethod def from_cookies(cls, cookies: Mapping[str, str]): """Load a session from session cookies. Args: cookies: A dictionary containing session cookies Example: >>> cookies = session.get_cookies() >>> # Store cookies somewhere, and then subsequently >>> session = fbchat.Session.from_cookies(cookies) """ session = session_factory() session.cookies = requests.cookies.merge_cookies(session.cookies, cookies) return cls._from_session(session=session)
def _post(self, url, data, files=None, as_graphql=False): data.update(self._get_params()) try: r = self._session.post(prefix_url(url), data=data, files=files) except requests.RequestException as e: _exception.handle_requests_error(e) # Facebook's encoding is always UTF-8 r.encoding = "utf-8" _exception.handle_http_error(r.status_code) if r.text is None or len(r.text) == 0: raise _exception.HTTPError("Error when sending request: Got empty response") if as_graphql: return _graphql.response_to_json(r.text) else: text = _util.strip_json_cruft(r.text) j = _util.parse_json(text) log.debug(j) return j def _payload_post(self, url, data, files=None): j = self._post(url, data, files=files) _exception.handle_payload_error(j) # update fb_dtsg token if received in response if "jsmods" in j: define = _util.get_jsmods_define(j["jsmods"]["define"]) fb_dtsg = get_fb_dtsg(define) if fb_dtsg: self._fb_dtsg = fb_dtsg try: return j["payload"] except (KeyError, TypeError) as e: raise _exception.ParseError("Missing payload", data=j) from e def _graphql_requests(self, *queries): # TODO: Explain usage of GraphQL, probably in the docs # Perhaps provide this API as public? data = { "method": "GET", "response_format": "json", "queries": _graphql.queries_to_json(*queries), } return self._post("/api/graphqlbatch/", data, as_graphql=True) def _do_send_request(self, data): now = datetime.datetime.utcnow() offline_threading_id = _util.generate_offline_threading_id() data["client"] = "mercury" data["author"] = "fbid:{}".format(self._user_id) data["timestamp"] = _util.datetime_to_millis(now) data["source"] = "source:chat:web" data["offline_threading_id"] = offline_threading_id data["message_id"] = offline_threading_id data["threading_id"] = generate_message_id(now, self._client_id) data["ephemeral_ttl_mode:"] = "0" j = self._post("/messaging/send/", data) _exception.handle_payload_error(j) try: message_ids = [ (action["message_id"], action["thread_fbid"]) for action in j["payload"]["actions"] if "message_id" in action ] if len(message_ids) != 1: log.warning("Got multiple message ids' back: {}".format(message_ids)) return message_ids[0] except (KeyError, IndexError, TypeError) as e: raise _exception.ParseError("No message IDs could be found", data=j) from e