Source code for fbchat.utils

# -*- coding: UTF-8 -*-

from __future__ import unicode_literals
import re
import json
from time import time
from random import random
from contextlib import contextmanager
from mimetypes import guess_type
from os.path import basename
import warnings
import logging
import requests
import aenum
from .models import *

try:
    from urllib.parse import urlencode, parse_qs, urlparse
    basestring = (str, bytes)
except ImportError:
    from urllib import urlencode
    from urlparse import parse_qs, urlparse
    basestring = basestring

# Python 2's `input` executes the input, whereas `raw_input` just returns the input
try:
    input = raw_input
except NameError:
    pass

# Log settings
log = logging.getLogger("client")
log.setLevel(logging.DEBUG)
# Creates the console handler
handler = logging.StreamHandler()
log.addHandler(handler)

#: Default list of user agents
USER_AGENTS = [
    "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_10_2) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/42.0.2311.90 Safari/537.36",
    "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_10_3) AppleWebKit/601.1.10 (KHTML, like Gecko) Version/8.0.5 Safari/601.1.10",
    "Mozilla/5.0 (Windows NT 6.3; WOW64; ; NCT50_AAP285C84A1328) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/42.0.2311.90 Safari/537.36",
    "Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.1 (KHTML, like Gecko) Chrome/22.0.1207.1 Safari/537.1",
    "Mozilla/5.0 (X11; CrOS i686 2268.111.0) AppleWebKit/536.11 (KHTML, like Gecko) Chrome/20.0.1132.57 Safari/536.11",
    "Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/536.6 (KHTML, like Gecko) Chrome/20.0.1092.0 Safari/536.6"
]

LIKES = {
    'large': EmojiSize.LARGE,
    'medium': EmojiSize.MEDIUM,
    'small': EmojiSize.SMALL,
    'l': EmojiSize.LARGE,
    'm': EmojiSize.MEDIUM,
    's': EmojiSize.SMALL
}


GENDERS = {
    # For standard requests
    0: 'unknown',
    1: 'female_singular',
    2: 'male_singular',
    3: 'female_singular_guess',
    4: 'male_singular_guess',
    5: 'mixed',
    6: 'neuter_singular',
    7: 'unknown_singular',
    8: 'female_plural',
    9: 'male_plural',
    10: 'neuter_plural',
    11: 'unknown_plural',

    # For graphql requests
    'UNKNOWN': 'unknown',
    'FEMALE': 'female_singular',
    'MALE': 'male_singular',
    #'': 'female_singular_guess',
    #'': 'male_singular_guess',
    #'': 'mixed',
    'NEUTER': 'neuter_singular',
    #'': 'unknown_singular',
    #'': 'female_plural',
    #'': 'male_plural',
    #'': 'neuter_plural',
    #'': 'unknown_plural',
}

[docs]class ReqUrl(object): """A class containing all urls used by `fbchat`""" SEARCH = "https://www.facebook.com/ajax/typeahead/search.php" LOGIN = "https://m.facebook.com/login.php?login_attempt=1" SEND = "https://www.facebook.com/messaging/send/" UNREAD_THREADS = "https://www.facebook.com/ajax/mercury/unread_threads.php" UNSEEN_THREADS = "https://www.facebook.com/mercury/unseen_thread_ids/" THREADS = "https://www.facebook.com/ajax/mercury/threadlist_info.php" MOVE_THREAD = "https://www.facebook.com/ajax/mercury/move_thread.php" ARCHIVED_STATUS = "https://www.facebook.com/ajax/mercury/change_archived_status.php?dpr=1" PINNED_STATUS = "https://www.facebook.com/ajax/mercury/change_pinned_status.php?dpr=1" MESSAGES = "https://www.facebook.com/ajax/mercury/thread_info.php" READ_STATUS = "https://www.facebook.com/ajax/mercury/change_read_status.php" DELIVERED = "https://www.facebook.com/ajax/mercury/delivery_receipts.php" MARK_SEEN = "https://www.facebook.com/ajax/mercury/mark_seen.php" BASE = "https://www.facebook.com" MOBILE = "https://m.facebook.com/" STICKY = "https://0-edge-chat.facebook.com/pull" PING = "https://0-edge-chat.facebook.com/active_ping" UPLOAD = "https://upload.facebook.com/ajax/mercury/upload.php" INFO = "https://www.facebook.com/chat/user_info/" CONNECT = "https://www.facebook.com/ajax/add_friend/action.php?dpr=1" REMOVE_USER = "https://www.facebook.com/chat/remove_participants/" LOGOUT = "https://www.facebook.com/logout.php" ALL_USERS = "https://www.facebook.com/chat/user_info_all" SAVE_DEVICE = "https://m.facebook.com/login/save-device/cancel/" CHECKPOINT = "https://m.facebook.com/login/checkpoint/" THREAD_COLOR = "https://www.facebook.com/messaging/save_thread_color/?source=thread_settings&dpr=1" THREAD_NICKNAME = "https://www.facebook.com/messaging/save_thread_nickname/?source=thread_settings&dpr=1" THREAD_EMOJI = "https://www.facebook.com/messaging/save_thread_emoji/?source=thread_settings&dpr=1" THREAD_IMAGE = "https://www.facebook.com/messaging/set_thread_image/?dpr=1" THREAD_NAME = "https://www.facebook.com/messaging/set_thread_name/?dpr=1" MESSAGE_REACTION = "https://www.facebook.com/webgraphql/mutation" TYPING = "https://www.facebook.com/ajax/messaging/typ.php" GRAPHQL = "https://www.facebook.com/api/graphqlbatch/" ATTACHMENT_PHOTO = "https://www.facebook.com/mercury/attachments/photo/" PLAN_CREATE = "https://www.facebook.com/ajax/eventreminder/create" PLAN_INFO = "https://www.facebook.com/ajax/eventreminder" PLAN_CHANGE = "https://www.facebook.com/ajax/eventreminder/submit" PLAN_PARTICIPATION = "https://www.facebook.com/ajax/eventreminder/rsvp" MODERN_SETTINGS_MENU = "https://www.facebook.com/bluebar/modern_settings_menu/" REMOVE_FRIEND = "https://m.facebook.com/a/removefriend.php" BLOCK_USER = "https://www.facebook.com/messaging/block_messages/?dpr=1" UNBLOCK_USER = "https://www.facebook.com/messaging/unblock_messages/?dpr=1" SAVE_ADMINS = "https://www.facebook.com/messaging/save_admins/?dpr=1" APPROVAL_MODE = "https://www.facebook.com/messaging/set_approval_mode/?dpr=1" CREATE_GROUP = "https://m.facebook.com/messages/send/?icm=1" DELETE_THREAD = "https://www.facebook.com/ajax/mercury/delete_thread.php?dpr=1" DELETE_MESSAGES = "https://www.facebook.com/ajax/mercury/delete_messages.php?dpr=1" MUTE_THREAD = "https://www.facebook.com/ajax/mercury/change_mute_thread.php?dpr=1" MUTE_REACTIONS = "https://www.facebook.com/ajax/mercury/change_reactions_mute_thread/?dpr=1" MUTE_MENTIONS = "https://www.facebook.com/ajax/mercury/change_mentions_mute_thread/?dpr=1" CREATE_POLL = "https://www.facebook.com/messaging/group_polling/create_poll/?dpr=1" UPDATE_VOTE = "https://www.facebook.com/messaging/group_polling/update_vote/?dpr=1" GET_POLL_OPTIONS = "https://www.facebook.com/ajax/mercury/get_poll_options" SEARCH_MESSAGES = "https://www.facebook.com/ajax/mercury/search_snippets.php?dpr=1" MARK_SPAM = "https://www.facebook.com/ajax/mercury/mark_spam.php?dpr=1" UNSEND = "https://www.facebook.com/messaging/unsend_message/?dpr=1" pull_channel = 0 def change_pull_channel(self, channel=None): if channel is None: self.pull_channel = (self.pull_channel + 1) % 5 # Pull channel will be 0-4 else: self.pull_channel = channel self.STICKY = "https://{}-edge-chat.facebook.com/pull".format(self.pull_channel) self.PING = "https://{}-edge-chat.facebook.com/active_ping".format(self.pull_channel)
facebookEncoding = 'UTF-8' def now(): return int(time()*1000) def strip_to_json(text): try: return text[text.index('{'):] except ValueError: raise FBchatException('No JSON object found: {!r}'.format(text)) def get_decoded_r(r): return get_decoded(r._content) def get_decoded(content): return content.decode(facebookEncoding) def parse_json(content): return json.loads(content) def get_json(r): return json.loads(strip_to_json(get_decoded_r(r))) def digitToChar(digit): if digit < 10: return str(digit) return chr(ord('a') + digit - 10) def str_base(number, base): if number < 0: return '-' + str_base(-number, base) (d, m) = divmod(number, base) if d > 0: return str_base(d, base) + digitToChar(m) return digitToChar(m) def generateMessageID(client_id=None): k = now() l = int(random() * 4294967295) return "<{}:{}-{}@mail.projektitan.com>".format(k, l, client_id) def getSignatureID(): return hex(int(random() * 2147483648)) def generateOfflineThreadingID(): ret = now() value = int(random() * 4294967295) string = ("0000000000000000000000" + format(value, 'b'))[-22:] msgs = format(ret, 'b') + string return str(int(msgs, 2)) def check_json(j): if j.get('error') is None: return if 'errorDescription' in j: # 'errorDescription' is in the users own language! raise FBchatFacebookError('Error #{} when sending request: {}'.format(j['error'], j['errorDescription']), fb_error_code=j['error'], fb_error_message=j['errorDescription']) elif 'debug_info' in j['error'] and 'code' in j['error']: raise FBchatFacebookError('Error #{} when sending request: {}'.format(j['error']['code'], repr(j['error']['debug_info'])), fb_error_code=j['error']['code'], fb_error_message=j['error']['debug_info']) else: raise FBchatFacebookError('Error {} when sending request'.format(j['error']), fb_error_code=j['error']) def check_request(r, as_json=True): if not r.ok: raise FBchatFacebookError('Error when sending request: Got {} response'.format(r.status_code), request_status_code=r.status_code) content = get_decoded_r(r) if content is None or len(content) == 0: raise FBchatFacebookError('Error when sending request: Got empty response') if as_json: content = strip_to_json(content) try: j = json.loads(content) except ValueError: raise FBchatFacebookError('Error while parsing JSON: {!r}'.format(content)) check_json(j) log.debug(j) return j else: return content def get_jsmods_require(j, index): if j.get('jsmods') and j['jsmods'].get('require'): try: return j['jsmods']['require'][0][index][0] except (KeyError, IndexError) as e: log.warning('Error when getting jsmods_require: {}. Facebook might have changed protocol'.format(j)) return None def get_emojisize_from_tags(tags): if tags is None: return None tmp = [tag for tag in tags if tag.startswith('hot_emoji_size:')] if len(tmp) > 0: try: return LIKES[tmp[0].split(':')[1]] except (KeyError, IndexError): log.exception('Could not determine emoji size from {} - {}'.format(tags, tmp)) return None def require_list(list_): if isinstance(list_, list): return set(list_) else: return set([list_]) def mimetype_to_key(mimetype): if not mimetype: return "file_id" if mimetype == "image/gif": return "gif_id" x = mimetype.split("/") if x[0] in ["video", "image", "audio"]: return "%s_id" % x[0] return "file_id" def get_files_from_urls(file_urls): files = [] for file_url in file_urls: r = requests.get(file_url) # We could possibly use r.headers.get('Content-Disposition'), see # https://stackoverflow.com/a/37060758 files.append(( basename(file_url), r.content, r.headers.get('Content-Type') or guess_type(file_url)[0], )) return files @contextmanager def get_files_from_paths(filenames): files = [] for filename in filenames: files.append(( basename(filename), open(filename, 'rb'), guess_type(filename)[0], )) yield files for fn, fp, ft in files: fp.close() def enum_extend_if_invalid(enumeration, value): try: return enumeration(value) except ValueError: log.warning("Failed parsing {.__name__}({!r}). Extending enum.".format(enumeration, value)) aenum.extend_enum(enumeration, "UNKNOWN_{}".format(value).upper(), value) return enumeration(value) def get_url_parameters(url, *args): params = parse_qs(urlparse(url).query) return [params[arg][0] for arg in args if params.get(arg)] def get_url_parameter(url, param): return get_url_parameters(url, param)[0]