Source code for fbchat.utils

# -*- coding: UTF-8 -*-

from __future__ import unicode_literals
import re
import json
from time import time
from random import random
from contextlib import contextmanager
from mimetypes import guess_type
from os.path import basename
import warnings
import logging
import requests
from .models import *

    from urllib.parse import urlencode
    basestring = (str, bytes)
except ImportError:
    from urllib import urlencode
    basestring = basestring

# Python 2's `input` executes the input, whereas `raw_input` just returns the input
    input = raw_input
except NameError:

# Log settings
log = logging.getLogger("client")
# Creates the console handler
handler = logging.StreamHandler()

#: Default list of user agents
    "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_10_2) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/42.0.2311.90 Safari/537.36",
    "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_10_3) AppleWebKit/601.1.10 (KHTML, like Gecko) Version/8.0.5 Safari/601.1.10",
    "Mozilla/5.0 (Windows NT 6.3; WOW64; ; NCT50_AAP285C84A1328) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/42.0.2311.90 Safari/537.36",
    "Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/537.1 (KHTML, like Gecko) Chrome/22.0.1207.1 Safari/537.1",
    "Mozilla/5.0 (X11; CrOS i686 2268.111.0) AppleWebKit/536.11 (KHTML, like Gecko) Chrome/20.0.1132.57 Safari/536.11",
    "Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/536.6 (KHTML, like Gecko) Chrome/20.0.1092.0 Safari/536.6"

    'large': EmojiSize.LARGE,
    'medium': EmojiSize.MEDIUM,
    'small': EmojiSize.SMALL,
    'l': EmojiSize.LARGE,
    'm': EmojiSize.MEDIUM,
    's': EmojiSize.SMALL

    # For standard requests
    0: 'unknown',
    1: 'female_singular',
    2: 'male_singular',
    3: 'female_singular_guess',
    4: 'male_singular_guess',
    5: 'mixed',
    6: 'neuter_singular',
    7: 'unknown_singular',
    8: 'female_plural',
    9: 'male_plural',
    10: 'neuter_plural',
    11: 'unknown_plural',

    # For graphql requests
    'UNKNOWN': 'unknown',
    'FEMALE': 'female_singular',
    'MALE': 'male_singular',
    #'': 'female_singular_guess',
    #'': 'male_singular_guess',
    #'': 'mixed',
    'NEUTER': 'neuter_singular',
    #'': 'unknown_singular',
    #'': 'female_plural',
    #'': 'male_plural',
    #'': 'neuter_plural',
    #'': 'unknown_plural',

[docs]class ReqUrl(object): """A class containing all urls used by `fbchat`""" SEARCH = "" LOGIN = "" SEND = "" UNREAD_THREADS = "" UNSEEN_THREADS = "" THREADS = "" MOVE_THREAD = "" ARCHIVED_STATUS = "" PINNED_STATUS = "" MESSAGES = "" READ_STATUS = "" DELIVERED = "" MARK_SEEN = "" BASE = "" MOBILE = "" STICKY = "" PING = "" UPLOAD = "" INFO = "" CONNECT = "" REMOVE_USER = "" LOGOUT = "" ALL_USERS = "" SAVE_DEVICE = "" CHECKPOINT = "" THREAD_COLOR = "" THREAD_NICKNAME = "" THREAD_EMOJI = "" THREAD_IMAGE = "" THREAD_NAME = "" MESSAGE_REACTION = "" TYPING = "" GRAPHQL = "" ATTACHMENT_PHOTO = "" PLAN_CREATE = "" PLAN_INFO = "" PLAN_CHANGE = "" PLAN_PARTICIPATION = "" MODERN_SETTINGS_MENU = "" REMOVE_FRIEND = "" BLOCK_USER = "" UNBLOCK_USER = "" SAVE_ADMINS = "" APPROVAL_MODE = "" CREATE_GROUP = "" DELETE_THREAD = "" DELETE_MESSAGES = "" MUTE_THREAD = "" MUTE_REACTIONS = "" MUTE_MENTIONS = "" CREATE_POLL = "" UPDATE_VOTE = "" GET_POLL_OPTIONS = "" SEARCH_MESSAGES = "" MARK_SPAM = "" pull_channel = 0 def change_pull_channel(self, channel=None): if channel is None: self.pull_channel = (self.pull_channel + 1) % 5 # Pull channel will be 0-4 else: self.pull_channel = channel self.STICKY = "https://{}".format(self.pull_channel) self.PING = "https://{}".format(self.pull_channel)
facebookEncoding = 'UTF-8' def now(): return int(time()*1000) def strip_to_json(text): try: return text[text.index('{'):] except ValueError: raise FBchatException('No JSON object found: {!r}'.format(text)) def get_decoded_r(r): return get_decoded(r._content) def get_decoded(content): return content.decode(facebookEncoding) def parse_json(content): return json.loads(content) def get_json(r): return json.loads(strip_to_json(get_decoded_r(r))) def digitToChar(digit): if digit < 10: return str(digit) return chr(ord('a') + digit - 10) def str_base(number, base): if number < 0: return '-' + str_base(-number, base) (d, m) = divmod(number, base) if d > 0: return str_base(d, base) + digitToChar(m) return digitToChar(m) def generateMessageID(client_id=None): k = now() l = int(random() * 4294967295) return "<{}:{}-{}>".format(k, l, client_id) def getSignatureID(): return hex(int(random() * 2147483648)) def generateOfflineThreadingID(): ret = now() value = int(random() * 4294967295) string = ("0000000000000000000000" + format(value, 'b'))[-22:] msgs = format(ret, 'b') + string return str(int(msgs, 2)) def check_json(j): if j.get('error') is None: return if 'errorDescription' in j: # 'errorDescription' is in the users own language! raise FBchatFacebookError('Error #{} when sending request: {}'.format(j['error'], j['errorDescription']), fb_error_code=j['error'], fb_error_message=j['errorDescription']) elif 'debug_info' in j['error'] and 'code' in j['error']: raise FBchatFacebookError('Error #{} when sending request: {}'.format(j['error']['code'], repr(j['error']['debug_info'])), fb_error_code=j['error']['code'], fb_error_message=j['error']['debug_info']) else: raise FBchatFacebookError('Error {} when sending request'.format(j['error']), fb_error_code=j['error']) def check_request(r, as_json=True): if not r.ok: raise FBchatFacebookError('Error when sending request: Got {} response'.format(r.status_code), request_status_code=r.status_code) content = get_decoded_r(r) if content is None or len(content) == 0: raise FBchatFacebookError('Error when sending request: Got empty response') if as_json: content = strip_to_json(content) try: j = json.loads(content) except ValueError: raise FBchatFacebookError('Error while parsing JSON: {!r}'.format(content)) check_json(j) log.debug(j) return j else: return content def get_jsmods_require(j, index): if j.get('jsmods') and j['jsmods'].get('require'): try: return j['jsmods']['require'][0][index][0] except (KeyError, IndexError) as e: log.warning('Error when getting jsmods_require: {}. Facebook might have changed protocol'.format(j)) return None def get_emojisize_from_tags(tags): if tags is None: return None tmp = [tag for tag in tags if tag.startswith('hot_emoji_size:')] if len(tmp) > 0: try: return LIKES[tmp[0].split(':')[1]] except (KeyError, IndexError): log.exception('Could not determine emoji size from {} - {}'.format(tags, tmp)) return None def require_list(list_): if isinstance(list_, list): return set(list_) else: return set([list_]) def mimetype_to_key(mimetype): if not mimetype: return "file_id" if mimetype == "image/gif": return "gif_id" x = mimetype.split("/") if x[0] in ["video", "image", "audio"]: return "%s_id" % x[0] return "file_id" def get_files_from_urls(file_urls): files = [] for file_url in file_urls: r = requests.get(file_url) # We could possibly use r.headers.get('Content-Disposition'), see # files.append(( basename(file_url), r.content, r.headers.get('Content-Type') or guess_type(file_url)[0], )) return files @contextmanager def get_files_from_paths(filenames): files = [] for filename in filenames: files.append(( basename(filename), open(filename, 'rb'), guess_type(filename)[0], )) yield files for fn, fp, ft in files: fp.close()