pyqt抢购nft

core


# -*- coding: utf-8 -*-
import multiprocessing
import sys
import time
from PyQt5.QtCore import QThread, pyqtSignal
from PyQt5.QtWidgets import QApplication, QMainWindow
from qiankunui import Ui_MainWindow
import requests
import cgitb
from datetime import datetime
from apscheduler.schedulers.blocking import BlockingScheduler
from apscheduler.schedulers.qt import QtScheduler
from itertools import cycle

from requests.packages import urllib3

urllib3.disable_warnings()
cgitb.enable(format='text')


class MyMainForm(QMainWindow, Ui_MainWindow):
    def __init__(self, parent=None):
        super(MyMainForm, self).__init__(parent)
        self.setupUi(self)
        # 2.手动多线程发包
        self.work = WorkThread(self)
        self.start.clicked.connect(self.execute)
        self.stop.clicked.connect(self.stoping)

        # 1.登录
        self.ui_login = Login(self)
        self.login.clicked.connect(self.start_login)

        # 3.auto
        self.auto_buy.clicked.connect(self.auto_buy1)

    def auto_buy1(self):
        self.work.auto_buy()
        self.work.trigger.connect(self.display)

    def start_login(self):
        self.ui_login.start()
        self.ui_login.trigger.connect(self.display)

    def stoping(self):
        self.work.stop()
        self.work.trigger.connect(self.display)

    def execute(self):
        # 启动线程
        self.work.start()
        # 线程自定义信号连接的槽函数
        self.work.trigger.connect(self.display)

    def display(self, str):
        # 由于自定义信号时自动传递一个字符串参数,所以在这个槽函数中要接受一个参数
        self.listWidget.addItem(str)


class Login(QThread):
    trigger = pyqtSignal(str)

    def __init__(self, demo):
        super(Login, self).__init__()
        self.demo = demo

    def run(self):
        username = self.demo.username.text().strip()
        password = self.demo.password.text().strip()

        try:
            burp0_url = "https://x.com"
            burp0_headers = {"Sec-Ch-Ua": "\"(Not(A:Brand\";v=\"8\", \"Chromium\";v=\"101\"", "Sid": "35001800000",
                             "Sec-Ch-Ua-Mobile": "?0",
                             "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/101.0.4951.54 Safari/537.36",
                             "Content-Type": "application/x-www-form-urlencoded", "Newversion": "H5_1.0",
                             "Sec-Ch-Ua-Platform": "\"Windows\"", "Source": "218", "Accept": "*/*",
                             "Origin": "http://nt.fengkuangtiyu.cn", "Sec-Fetch-Site": "cross-site",
                             "Sec-Fetch-Mode": "cors", "Sec-Fetch-Dest": "empty",
                             "Referer": "http://nt.fengkuangtiyu.cn/",
                             "Accept-Encoding": "gzip, deflate", "Accept-Language": "zh-CN,zh;q=0.9",
                             "Connection": "close"}
            burp0_data = {"phone": f"{username}", "loginPwd": f"{password}"}
            res = requests.post(burp0_url, headers=burp0_headers, data=burp0_data, verify=False)
        except Exception as e:
            print(e)

        res_json = res.json()
        print(res_json["data"]["loginSign"])

        self.trigger.emit(res.text)
        self.demo.token.setText(res_json["data"]["loginSign"])
        self.demo.userid.setText(res_json["data"]["userId"])


class WorkThread(QThread):
    # 自定义信号对象。参数str就代表这个信号可以传一个字符串
    trigger = pyqtSignal(str)

    def __init__(self, demo):  # 3
        super(WorkThread, self).__init__()
        self.demo = demo
        self.pool = None

    def callback(self, x):
        self.trigger.emit(x)

    # 主要启动函数
    def run(self):
        # 重写线程执行的run函数
        # 触发自定义信号

        userid = self.demo.userid.text().strip()
        token = self.demo.token.text().strip()
        goodsid = self.demo.goodsid.text().strip()
        issueid = self.demo.issueid.text().strip()
        _proxy = self.demo.proxy_pool.text().strip()
        try:
            res = requests.get(_proxy)
            self.ips = res.text.splitlines()

        except Exception as e:
            print(e)

        self.pool = multiprocessing.Pool()
        for ip in cycle(self.ips):
            self.pool.apply_async(self.work, (userid, token, goodsid, issueid, ip), error_callback=self.callback,
                                  callback=self.callback)
        self.pool.close()
        self.pool.join()

    @staticmethod
    def work(userid, token, goodsid, issueid, ip=None):
        proxies = {"http": ip, "https": ip}
        burp0_url = "https://x"
        burp0_headers = {"Sec-Ch-Ua": "\"(Not(A:Brand\";v=\"8\", \"Chromium\";v=\"101\"", "Sid": "35001800000",
                         "Sec-Ch-Ua-Mobile": "?0",
                         "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/101.0.4951.54 Safari/537.36",
                         "Content-Type": "application/x-www-form-urlencoded", "Newversion": "H5_1.0",
                         "Loginsign": f"{token}", "Sec-Ch-Ua-Platform": "\"Windows\"",
                         "Source": "218", "Accept": "*/*", "Origin": "http://nt.fengkuangtiyu.cn",
                         "Sec-Fetch-Site": "cross-site", "Sec-Fetch-Mode": "cors", "Sec-Fetch-Dest": "empty",
                         "Referer": "http://nt.fengkuangtiyu.cn/", "Accept-Encoding": "gzip, deflate",
                         "Accept-Language": "zh-CN,zh;q=0.9", "Connection": "close"}
        burp0_data = {"userId": f"{userid}", "goodsId": f"{goodsid}", "issueId": f"{issueid}"}
        try:
            res = requests.post(burp0_url, headers=burp0_headers, data=burp0_data, proxies=proxies)
        except:
            pass
        time.sleep(0.1)
        # 通过自定义信号把待显示的字符串传递给槽函数
        print(res.text)
        return res.text

    # 停止
    def stop(self):
        print('stop')
        self.pool.terminate()

        self.is_running = False
        self.terminate()
        self.trigger.emit("stop")

    # 定时抢购
    def auto_buy(self):
        ss = self.demo.ss.text().strip()
        y = self.demo.year.text().strip()
        mon = self.demo.mon.text().strip()
        days = self.demo.days.text().strip()
        hours = self.demo.hours.text().strip()
        mins = self.demo.mins.text().strip()

        scheduler = QtScheduler()
        scheduler.add_job(self.run, 'date',
                          run_date=datetime(int(y), int(mon), int(days), int(hours), int(mins), int(ss)),
                          )

        scheduler.start()


if __name__ == "__main__":
    app = QApplication(sys.argv)
    myWin = MyMainForm()
    myWin.show()
    sys.exit(app.exec_())

ui

pyqt

pyqt极速入门

安装

安装pyqt
pip install pyqt5

安装简化版qt designer
https://build-system.fman.io/qt-designer-download

qt designer

先用qt designer画框

然后将ui转为py代码

pyuic5 -o destination.py original.ui 

ui demo

scapy流量分析

scapy流量分析

关于流量分析,在某些上了手段的场景,主动或者被动获取到了流量,这个时候可以用来威胁猎杀,捞出你想要的信息

权限维持-py服务

权限维持-py服务

# encoding=utf-8
import win32serviceutil
import win32service
import win32event
import os
import logging
import inspect


class PySerTest(win32serviceutil.ServiceFramework):
    _svc_name_ = "PySerTest"
    _svc_display_name_ = "Py Service Test"  # 服务显示的名称,可以自己修改
    _svc_description_ = "This is a python service test code "  # 服务显示的描述

    def __init__(self, args):
        win32serviceutil.ServiceFramework.__init__(self, args)
        self.hWaitStop = win32event.CreateEvent(None, 0, 0, None)
        self.logger = self._getLogger()
        self.run = True

    def _getLogger(self):
        logger = logging.getLogger('[PythonService]')
        this_file = inspect.getfile(inspect.currentframe())
        dirpath = os.path.abspath(os.path.dirname(this_file))
        # handler = logging.FileHandler(os.path.join(dirpath, "service.log"))
        handler = logging.FileHandler("c:\\service.log")
        formatter = logging.Formatter('%(asctime)s %(name)-12s %(levelname)-8s %(message)s')
        handler.setFormatter(formatter)
        logger.addHandler(handler)
        logger.setLevel(logging.INFO)
        return logger

    def SvcDoRun(self):
        # 在此编写自己的业务程序
        import time
        self.logger.info("service is run....")
        while self.run:
            self.logger.info("I am runing....")
            time.sleep(2)

    def SvcStop(self):
        self.logger.info("service is stop....")
        self.ReportServiceStatus(win32service.SERVICE_STOP_PENDING)
        win32event.SetEvent(self.hWaitStop)
        self.ReportServiceStatus(win32service.SERVICE_STOPPED)
        self.run = False


if __name__ == '__main__':
    # win32serviceutil.HandleCommandLine(PythonService)
    import sys
    import servicemanager

    if len(sys.argv) == 1:
        try:
            evtsrc_dll = os.path.abspath(servicemanager.__file__)
            servicemanager.PrepareToHostSingle(PySerTest)  # 如果修改过名字,名字要统一
            servicemanager.Initialize('PySerTest', evtsrc_dll)  # 如果修改过名字,名字要统一
            servicemanager.StartServiceCtrlDispatcher()
        except win32service.error as details:
            import winerror

            if details == winerror.ERROR_FAILED_SERVICE_CONTROLLER_CONNECT:
                win32serviceutil.usage()
    else:
        win32serviceutil.HandleCommandLine(PySerTest)  # 如果修改过名字,名字要统一

权限维持-py计划任务

权限维持-py计划任务

底层还是调用api,com玩来玩去

纵观历史,从无非就是从vb,powershell,c#,以及后面的各类小众语言演变出来的调用api的维权