参数_sig分析

  • 使用GDA打开分析app源码,豆瓣没有进行加密,直接全局_sig关键字,分析一下就能定位到加密函数的位置com.douban.frodo.network.ApiSignatureHelper,下看源码发现使用sha1算法进行hash,还有一个固定的字符串作为密钥,直接使用frida进行hook就能获取密钥。属于比较简单的那种啦

sha1 算法python实现

def make_digest(message, key):
    key = bytes(key, 'UTF-8')
    message = bytes(message, 'UTF-8')
    digester = hmac.new(key, message, hashlib.sha1)
    signature1 = digester.digest()
    signature2 = base64.b64encode(signature1)
    return str(signature2, 'UTF-8')

算法可以直接在网上就能找到,需要注意的是豆瓣的在获得这些原始的值在拼接url的时候会进行一些特殊字符的转换,比如空格会被替换成+,=替换成%3D,不注意这些也会请求失败。剩下就是分析每个连接的请求格式,参数是什么,直接hook加密的函数,然后把参数打印出来,把需要的请求连接的参数打印复制,然后按照格式生成sig加密参数即可
一下是完整项目的源码

import requests
import time
import hmac
import base64
import hashlib
import redis
import json
import traceback
import random

requests.packages.urllib3.disable_warnings()
redis_key = "list:doupan"
redis_user_key = "list:doupan:user"
r = redis.StrictRedis(host='127.0.0.1', port=6379, decode_responses=True)

auth = None
auth_list = ["4ae2de71dda2cc54******", "96ec93cfa99b9521********"]

headers = {
    "Authorization": f"Bearer {auth}",
    "User-Agent": "api-client/1 com.douban.frodo/7.3.0(207) Android/29 product/sirius vendor/Xiaomi model/MI 8 SE  rom/miui6  network/wifi  udid/228e53eeaa5de163d9d0c39c712a301f77352075  platform/mobile",
    "Host": 'frodo.douban.com',
    "Connection": "Keep-Alive",
    "Accept-Encoding": "gzip"

}


def get_proxy():
    return {'http': '127.0.0.1:1080', 'https:': '127.0.0.1:1080'}


def make_digest(message, key):
    key = bytes(key, 'UTF-8')
    message = bytes(message, 'UTF-8')
    digester = hmac.new(key, message, hashlib.sha1)
    signature1 = digester.digest()
    signature2 = base64.b64encode(signature1)
    return str(signature2, 'UTF-8')


def get_sig(data):
    return make_digest(data, 'bf7dddc7c9cfe6f7')


def genarate():
    r.delete(redis_key)
    for i in range(0, 6900, 30):
        print(i)
        r.lpush(redis_key, i)
    print('生产完毕')


def genatrate_user():
    r.delete(redis_user_key)
    with open('members.txt', 'r') as f:
        for line in f.readlines():
            g_json = json.loads(line)
            for mem in g_json['members']:
                print(mem)
                r.lpush(redis_user_key, json.dumps(mem))
    print('生产完毕')


def get_url_by_query_data(query_data, base_url, t):
    sig = get_sig(query_data).replace('=', '%3D')
    url = base_url + '&' + f'_sig={sig}' + '&' + f'_ts={t}'
    url = url.replace('+', '%2B')
    return url


def t_10():
    return int(time.time())


def get_json_by_url(url):
    res = requests.get(url=url, headers=headers, proxies=get_proxy(), verify=False)
    return res.json()


def get_user_subject_json_by_uid(uid):
    silence_url = f"https://frodo.douban.com/api/v2/user/{uid}/subjectfeed/timeslices?apikey=0dad551ec0f84ed02907ff5c42e8ec70&channel=Xiaomi_Market&udid=228e53eeaa5de163d9d0c39c712a301f77352075&os_rom=miui6&oaid=18052974ff1b7b8e&timezone=Asia%2FShanghai"
    t = t_10()
    query_data = f"GET&%2Fapi%2Fv2%2Fuser%2F{uid}%2Fsubjectfeed%2Ftimeslices&{auth}&{t}"
    url = get_url_by_query_data(query_data, silence_url, t)
    res = requests.get(url=url, headers=headers, proxies=get_proxy(), verify=False)
    recent = res.json()['timeslices'][0]['slice']
    t = t_10()
    book_url = f"https://frodo.douban.com/api/v2/user/{uid}/subjectfeed/items?slice={recent}&apikey=0dad551ec0f84ed02907ff5c42e8ec70&channel=Xiaomi_Market&udid=228e53eeaa5de163d9d0c39c712a301f77352075&os_rom=miui6&oaid=18052974ff1b7b8e&timezone=Asia%2FShanghai"
    query_data = f"GET&%2Fapi%2Fv2%2Fuser%2F{uid}%2Fsubjectfeed%2Fitems&{auth}&{t}"
    url = get_url_by_query_data(query_data, book_url, t)
    return get_json_by_url(url)


def get_user_profile_json_by_uid(uid):
    user_url = f"https://frodo.douban.com/api/v2/user/{uid}?apikey=0dad551ec0f84ed02907ff5c42e8ec70&channel=Xiaomi_Market&udid=228e53eeaa5de163d9d0c39c712a301f77352075&os_rom=miui6&oaid=18052974ff1b7b8e&timezone=Asia%2FShanghai"
    t = t_10()
    query_data = f"GET&%2Fapi%2Fv2%2Fuser%2F{uid}&{auth}&{t}"
    url = get_url_by_query_data(query_data, user_url, t)
    return get_json_by_url(url)


def get_user_group_json_by_uid(uid):
    group_url = f"https://frodo.douban.com/api/v2/group/user/{uid}/profile_group_info?source=members_list&count=20&source_group_id=721085&apikey=0dad551ec0f84ed02907ff5c42e8ec70&channel=Xiaomi_Market&udid=228e53eeaa5de163d9d0c39c712a301f77352075&os_rom=miui6&oaid=18052974ff1b7b8e&timezone=Asia%2FShanghai"
    t = t_10()
    query_data = f"GET&%2Fapi%2Fv2%2Fgroup%2Fuser%2F{uid}%2Fprofile_group_info&{auth}&{t}"
    url = get_url_by_query_data(query_data, group_url, t)
    return get_json_by_url(url)


def get_stat_json_by_uid(uid):
    movie_url = f"https://frodo.douban.com/api/v2/user/{uid}/collection_stats?type=movie&udid=228e53eeaa5de163d9d0c39c712a301f77352075&rom=miui6&apikey=0dad551ec0f84ed02907ff5c42e8ec70&s=rexxar_new&channel=Xiaomi_Market&timezone=Asia%2FShanghai&device_id=228e53eeaa5de163d9d0c39c712a301f77352075&os_rom=miui6&oaid=18052974ff1b7b8e&apple=fcd16d795c0d89eb4fd85bbe36790e3a&mooncake=0f607264fc6318a92b9e13c65db7cd3c&sugar=46002&loc_id=118111"
    book_url = f"https://frodo.douban.com/api/v2/user/{uid}/collection_stats?type=book&udid=228e53eeaa5de163d9d0c39c712a301f77352075&rom=miui6&apikey=0dad551ec0f84ed02907ff5c42e8ec70&s=rexxar_new&channel=Xiaomi_Market&timezone=Asia%2FShanghai&device_id=228e53eeaa5de163d9d0c39c712a301f77352075&os_rom=miui6&oaid=18052974ff1b7b8e&apple=fcd16d795c0d89eb4fd85bbe36790e3a&mooncake=0f607264fc6318a92b9e13c65db7cd3c&sugar=46002&loc_id=118111"
    music_url = f"https://frodo.douban.com/api/v2/user/{uid}/collection_stats?type=music&udid=228e53eeaa5de163d9d0c39c712a301f77352075&rom=miui6&apikey=0dad551ec0f84ed02907ff5c42e8ec70&s=rexxar_new&channel=Xiaomi_Market&timezone=Asia%2FShanghai&device_id=228e53eeaa5de163d9d0c39c712a301f77352075&os_rom=miui6&oaid=18052974ff1b7b8e&apple=fcd16d795c0d89eb4fd85bbe36790e3a&mooncake=0f607264fc6318a92b9e13c65db7cd3c&sugar=46002&loc_id=118111"
    t = t_10()
    query_data = f"GET&%2Fapi%2Fv2%2Fuser%2F{uid}%2Fcollection_stats&{auth}&{t}"
    url = get_url_by_query_data(query_data, movie_url, t)
    movie_json = get_json_by_url(url)
    t = t_10()
    query_data = f"GET&%2Fapi%2Fv2%2Fuser%2F{uid}%2Fcollection_stats&{auth}&{t}"
    url = get_url_by_query_data(query_data, book_url, t)
    book_json = get_json_by_url(url)
    t = t_10()
    query_data = f"GET&%2Fapi%2Fv2%2Fuser%2F{uid}%2Fcollection_stats&{auth}&{t}"
    url = get_url_by_query_data(query_data, music_url, t)
    music_json = get_json_by_url(url)
    res = {}
    genres_key = 'genres'
    if genres_key in movie_json.keys():
        res['movie'] = movie_json['genres']
    else:
        res['movie'] = {}
    if genres_key in book_json.keys():
        res['book'] = book_json['genres']
    else:
        res['book'] = []
    if genres_key in music_json.keys():
        res['music'] = music_json['genres']
    else:
        res['music'] = {}
    return res


def print_format_json(j):
    print(json.dumps(j, sort_keys=True, indent=2, ensure_ascii=False))


def members_worker():
    while True:
        i = r.rpop(redis_key)
        try:
            base_url = f"https://frodo.douban.com/api/v2/group/721085/members?start={i}&count=30&period=default&apikey=0dad551ec0f84ed02907ff5c42e8ec70&channel=Xiaomi_Market&udid=228e53eeaa5de163d9d0c39c712a301f77352075&os_rom=miui6&oaid=18052974ff1b7b8e&timezone=Asia%2FShanghai"
            t = t_10()
            query_data = f"GET&%2Fapi%2Fv2%2Fgroup%2F721085%2Fmembers&96ec93cfa99b952bc4c5bb6efcd2bf81&{t}"
            sig = get_sig(query_data).replace(' ', '+')
            url = base_url + '&' + f'_sig={sig}' + '&' + f'_ts={t}'
            url = url.replace('+', '%2B')
            res = requests.get(url=url, headers=headers, proxies=get_proxy(), verify=False)
            res_json = res.json()
            if 'code' in res_json.keys():
                raise Exception('bad code')
            with open('members.txt', 'a') as f:
                f.write(json.dumps(res_json))
                f.write('\n')
            time.sleep(1)
        except Exception as e:
            print(str(e))
            r.lpush(redis_key, i)


def user_workers():
    while True:
        try:
            global auth, headers
            r_int = random.randint(0, len(auth_list) - 1)
            auth = auth_list[r_int]
            headers = {
                "Authorization": f"Bearer {auth}",
                "User-Agent": "api-client/1 com.douban.frodo/7.3.0(207) Android/29 product/sirius vendor/Xiaomi model/MI 8 SE  rom/miui6  network/wifi  udid/228e53eeaa5de163d9d0c39c712a301f77352075  platform/mobile",
                "Host": 'frodo.douban.com',
                "Connection": "Keep-Alive",
                "Accept-Encoding": "gzip"

            }
            user = r.rpop(redis_user_key)
            print(user)
            if not user:
                print('任务列表为空')
            user = eval(user)
            uid = user['id']
            name = user['name']
            reg_time = user['reg_time']
            gender = user['gender']
            user_profile = get_user_profile_json_by_uid(uid)
            birthday = user_profile['birthday'] or ' '
            hometown = user_profile['hometown'] or ' '
            join_group_count = user_profile['joined_group_count']
            status_count = user_profile['statuses_count']
            movie_count = user_profile['movie_collected_count']
            book_collected_count = user_profile['book_collected_count']
            music_count = user_profile['music_collected_count']
            followers_count = user_profile['followers_count']
            following_count = user_profile['following_count']
            groups = get_user_group_json_by_uid(uid)
            stat_json = get_stat_json_by_uid(uid)
            movie_tag = ''
            count = 0
            for m in stat_json['movie']:
                count += 1
                movie_tag += m['name'] + ':' + str(m['value'])
                movie_tag += ';'
                if count >= 5:
                    break
            book_tag = ''
            count = 0
            for b in stat_json['book']:
                count += 1
                book_tag += b['name'] + ':' + str(b['value'])
                book_tag += ';'
                if count >= 5:
                    break
            music_tag = ''
            count = 0
            for mu in stat_json['music']:
                count += 1
                music_tag += mu['name'] + ':' + str(mu['value'])
                music_tag += ';'
                if count >= 5:
                    break
            group_str = ''
            count = 0

            for g in groups['groups'][1:]:
                count += 1
                group_str += g['name'] + ';'
                if count >= 10:
                    break
            res = [uid, name, gender, birthday, hometown, reg_time, join_group_count, group_str, status_count,
                   movie_count,
                   book_collected_count, music_count]
            res.extend([movie_tag, book_tag, music_tag, following_count, followers_count])
            line = ','.join(map(str, res))
            with open('doupan_user.csv', 'a', encoding='utf8') as f:
                f.write(line)
                f.write('\n')
            print(f'[* OK ]{uid}')
            # 限制速率,不然会请求失败
        except Exception as e:
            # 重新放入
            print(f'[* bad {uid}]')
            r.lpush(redis_user_key, user)
            traceback.print_exc()


def run():
    from threading import Thread
    # 限制速度
    max_concurrence = 1
    # target = members_worker
    target = user_workers
    task_list = []
    for i in range(max_concurrence):
        t = Thread(target=target, args=())
        t.start()
        task_list.append(t)
    for t in task_list:
        t.join()


def write(line):
    with open('doupan_user.csv', 'a', encoding='utf8') as f:
        f.write(line)
        f.write('\n')


def read():
    count = 0
    with open('doupan_user.csv', 'rb') as f:
        for line in f.readlines():
            try:
                utf_8_str = line.decode('utf8')
                with open('doupan_utf8.csv', 'a', encoding='utf-8-sig', newline='') as f2:

                    f2.write(utf_8_str)
            except Exception as e:
                count += 1

    print(count)


def test():
    global auth, headers
    r = random.randint(0, len(auth_list) - 1)
    auth = auth_list[r]
    headers = {
        "Authorization": f"Bearer {auth}",
        "User-Agent": "api-client/1 com.douban.frodo/7.3.0(207) Android/29 product/sirius vendor/Xiaomi model/MI 8 SE  rom/miui6  network/wifi  udid/228e53eeaa5de163d9d0c39c712a301f77352075  platform/mobile",
        "Host": 'frodo.douban.com',
        "Connection": "Keep-Alive",
        "Accept-Encoding": "gzip"

    }
    uid = 171928829
    print(auth)
    print_format_json(get_user_group_json_by_uid(uid))
    print_format_json(get_user_subject_json_by_uid(uid))
    print_format_json(get_stat_json_by_uid(uid))
    # write()


if __name__ == '__main__':
    # genatrate_user()
    # run()
    # read()
    # test()
    run()
最后修改:2021 年 12 月 11 日
如果觉得我的文章对你有用,请随意赞赏