0.声明及版权信息

0.1.特别声明

  • 本仓库发布的DingDongPrintScCatch项目中涉及的任何脚本,仅用于测试和学习研究,禁止用于商业用途,不能保证其合法性,准确性,完整性和有效性,请根据情况自行判断。

  • 本项目内所有资源文件,禁止任何公众号、自媒体进行任何形式的转载、发布。

  • tuoxieleng 对任何脚本问题概不负责,包括但不限于由任何脚本错误导致的任何损失或损害。

  • 间接使用脚本的任何用户,包括但不限于建立VPS或在某些行为违反国家/地区法律或相关法规的情况下进行传播, tuoxieleng 对于由此引起的任何隐私泄漏或其他后果概不负责。

  • 请勿将DingDongPrintScCatch项目的任何内容用于商业或非法目的,否则后果自负。

  • 如果任何单位或个人认为该项目的脚本可能涉嫌侵犯其权利,则应及时通知并提供身份证明,所有权证明,我将在收到认证文件后删除相关脚本。

  • 以任何方式查看此项目的人或直接或间接使用DingDongPrintScCatch项目的任何脚本的使用者都应仔细阅读此声明。tuoxieleng 保留随时更改或补充此免责声明的权利。一旦使用并复制了任何相关脚本或DingDongPrintScCatch项目,则视为您已接受此免责声明。

  • 本项目遵循GPL-3.0 License协议,如果本特别声明与GPL-3.0 License协议有冲突之处,以本特别声明为准。

您使用或者复制了本仓库且本人制作的任何代码或项目,则视为已接受此声明,请仔细阅读
您在本声明未发出之时点使用或者复制了本仓库且本人制作的任何代码或项目且此时还在使用,则视为已接受此声明,请仔细阅读

1.项目介绍

1.1.项目描述

DingDongPrintScCatch是一款轻量级的Android设备指定区域内容抓取、分析及消息推送工具;

项目集成Python、ADB及Ocr等组件,快速且持续的监听Android移动设备指定区域内容变化,内置决策交易中断条件函数,实现决策参数化配置;

项目集成ApkInstaller,包括adb.exe、AdbWinApi.dll、AdbWinUsbApi.dll。开箱即用。

项目集成 方糖君 消息推送工具,可实现预警触发后的消息推送( 方糖服务号、Android、Bark iOS、企业微信群机器人、钉钉群机器人、飞书群机器人、测试号 )。

1.2.业务功能

ADB实现Android设备与Windows设备通信;

Python结合ADB实现对Android设备的设备信息捕捉、屏幕内容捕捉、模拟操作;

Python结合Ocr实现影像信息文本挖掘;

Python结合决策信息实时判断应用程序(叮咚买菜)定位站点是否存在资源余量,并推送消息提醒下单

1.3.版本发布

发布版本(V1.1)一方库如下:

主控函数【main(self, OCR_RECOGNITION_MESSAGE)】;

屏幕指定位置采集函数【cutPrintScreen(self)】;

屏幕内容采集函数【getPrintScreen(self, filePath, ADBpath)】;

文件路径设置函数【getfilePath(self, filePath)】;

设备信息抓取函数【getDeviceInfo(self)】;

信息推送Wechat函数【sendMessage2Wechat(self, message)】;

决策交易中断条件函数【judgeProcessBreakCondition(self, message, breakCondition)】;

设备页面整幅刷新函数【swipeDevice(self)】。

1.4.获取帮助

  • 项目地址:https://gitee.com/tuoxieleng/dingdong-printsc-catch
  • 如需关注项目最新动态或担心以后找不到项目,可以Watch、Star项目,同时也是对项目最好的支持

2.快速入门

项目基于Python 3.8.8。个人搭建及测试需保证本地开发环境符合要求(具体要求见章节2.1环境信息)。

2.1.环境信息

  • Python 3.8.8
https://www.python.org/downloads/
  • pypi 镜像

升级 pip 到最新的版本 (>=10.0.0) 后进行配置

python -m pip install --upgrade pip
pip config set global.index-url https://pypi.tuna.tsinghua.edu.cn/simple

如果您到 pip 默认源的网络连接较差,使用清华镜像来升级 pip

python -m pip install -i https://pypi.tuna.tsinghua.edu.cn/simple --upgrade pip
  • DingDongPrintScCatch源码
git clone -b master 
  • 目录结构

在这里插入图片描述

  • 依赖更新

依赖缺失时可采用两种方法

1.Anaconda更新;
2.pip install ModuleName。

2.2.项目设置

  • 项目位置
项目根目录(DingDongPrintScCatch)需保证放置于Windows环境下非中文目录
  • 个性化配置
DingDongPrintScCatch\Config目录中config.ini文件维护项目所有个性化参数

config.ini

[config]
# 默认UA
DEFAULT_USER_AGENT = "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/87.0.4280.88 Safari/537.36"
#测试设备(1080x2400)SCREEN_PARAM
#LEFT_PIXEL=0
#TOP_PIXEL=1500
#RIGHT_PIXEL=1080
#DOWN_PIXEL=1600
#测试设备(1080x2412 FHD+)SCREEN_PARAM
#LEFT_PIXEL=0
#TOP_PIXEL=1500
#RIGHT_PIXEL=1080
#DOWN_PIXEL=1600
#测试设备(1440x3216 QHD+)SCREEN_PARAM
LEFT_PIXEL=0
TOP_PIXEL=2000
RIGHT_PIXEL=1440
DOWN_PIXEL=2120
#屏幕滑动(纵向滑动 500->1500)
START_X=500
START_Y=500
END_X=500
END_Y=1500
DURATION_MS=600
#线程休眠时间(冲抵页面整幅刷新耗时)
THREAD_SLEEP_TIME=3

#百度云OCR账号信息
APP_ID=10687373
API_KEY=BIziiO4FQbN7n7iu5kPCuEMF
SECRET_KEY=yOxbhG3qZp0KvNkB42hstT4aNWXHOitZ
#决策交易中断条件
BREAK_CONDITION=本站当前可预约
#推送提示信息
TIPS=收货站点存在可预约时间,请立即跟进!
 
[messenger]
#如果想开启下单成功后消息推送,则将 enable 设置为 true,默认为 false 不开启推送
#开启消息推送必须填入 sckey(请务必调整为被通知人sckey)
#如何获取请参考 http://sc.ftqq.com/3.version
enable = false
sckey =SCT114444T2nONwrSQ6SgXnAIIHPyHxuCY

2.3.业务拓扑

在这里插入图片描述

3.项目启动

3.1.基建配置

  • 个性化配置准备

按照config.ini建议调整
在这里插入图片描述

  • ADB集成(V1.1版本已集成)
ADB全称Android Debug Bridge,中文翻译安卓调试桥。简单来说就是可以通过这个命令用电脑控制手机。
  • Android Device USB调试开启
打开手机设置——关于手机,连续点击5次版本号直到提示“您已处于开发者模式”;
在设置中找到“开发者选项”并打开;
在开发者选项中找到“USB调试”并打开;
用数据线连接电脑与手机,并点击“允许”,以授权USB调试;
电脑的命令行窗口中键入adb命令

在这里插入图片描述

  • ADB验证
adb devices

在这里插入图片描述

3.2.脚本启动

DingDongPrintScCatch>py dingDongPrintScCatch.py
 # _*_ coding:UTF-8 _*_
import numpy as np
import win32con
import ctypes
import ctypes.wintypes
import threading
import time
import os
import sys 
import subprocess
from PIL import Image
from aip import AipOcr
import shutil
import urllib
import requests
import string
sys.path.append(os.path.join(os.path.split(os.path.realpath(__file__))[0]+'\\Config'))
from config import global_config


'''
热键功能,独立线程控制

'''
class Hotkey(threading.Thread):                                                            #创建一个Thread.threading的扩展类  

    def run(self):  
        global EXIT                                                                        #全局变量,这个可以在不同线程间共用
        global RUN                                                                         #全局变量,这个可以在不同线程间共用
        user32 = ctypes.windll.user32                                                      #加载user32.dll

        try:
            if not user32.RegisterHotKey(None, ID1, 0, HOTKEY_RUN):
                print ("Unable to register id", ID1) 

            if not user32.RegisterHotKey(None, ID2, 0, HOTKEY_EXIT):
                print ("Unable to register id", ID2) 

            msg = ctypes.wintypes.MSG()                                                    #检测热键是否被按下,并在最后释放快捷键  

            while True:
                if user32.GetMessageA(ctypes.byref(msg), None, 0, 0) != 0:
                    if msg.message == win32con.WM_HOTKEY:
                        if msg.wParam == ID2:
                            EXIT=True
                            return
                        elif msg.wParam == ID1:
                            RUN=True
                    user32.TranslateMessage(ctypes.byref(msg))  
                    user32.DispatchMessageA(ctypes.byref(msg))   
        finally:
            user32.UnregisterHotKey(None, ID1)                                             #释放热键
            user32.UnregisterHotKey(None, ID2)

'''
实例定义
'''
class Ans():

    '''
    主控函数
    '''
    def main(self,OCR_RECOGNITION_MESSAGE):
        im_name=os.listdir(filePath)
        self.cutPrintScreen()                                                              #图片裁剪
        f=open(filePath+'\\'+'preOcrImg.jpg','rb')
        image=f.read()                                                                     #百度Ocr api限定入参名称image
        '''
        百度云账号,config.ini配置
        '''
        APP_ID=global_config.getRaw('config', 'APP_ID')
        API_KEY=global_config.getRaw('config', 'API_KEY')
        SECRET_KEY=global_config.getRaw('config', 'SECRET_KEY')
        client = AipOcr(APP_ID, API_KEY, SECRET_KEY)                                       #百度AipOcr,获取一个实例对象,可以调用相应函数
        information=client.basicGeneral(image)                                             #Ocr函数,输出JSON格式
        num=information['words_result_num']
        for i in range(0,num):
            OCR_RECOGNITION_MESSAGE+=information['words_result'][i]['words']
            OCR_RECOGNITION_MESSAGE+=' '

        OCR_RECOGNITION_MESSAGE=OCR_RECOGNITION_MESSAGE.translate(string.punctuation)      #字符串些处理,去除非敏感字符
        DEL=['以下','哪个','是','的','什么','下列','哪种']
        for x in DEL:
            OCR_RECOGNITION_MESSAGE = OCR_RECOGNITION_MESSAGE.replace(x,' ')

        print ('OCR_RECOGNITION_MESSAGE:',OCR_RECOGNITION_MESSAGE)
        f.close()
        '''
        shutil.rmtree(filePath)                                                            #文件清理,调试模式可不清理
        '''
        return OCR_RECOGNITION_MESSAGE

    '''
    屏幕指定位置采集函数
    '''
    def cutPrintScreen(self):
        leftPixel=int(global_config.getRaw('config', 'LEFT_PIXEL'))
        topPixel=int(global_config.getRaw('config', 'TOP_PIXEL'))
        rightPixel=int(global_config.getRaw('config', 'RIGHT_PIXEL'))
        downPixel=int(global_config.getRaw('config', 'DOWN_PIXEL'))
        #print ("config.ini FOUND SCREEN_PARAM IS : leftPixel:",leftPixel,"topPixel:",topPixel,"rightPixel:",rightPixel,"downPixel:",downPixel)
        im=Image.open(filePath+'\\'+"printScreenImg.jpg")
        if len(im.split()) == 4:
            r, g, b, a = im.split()
            im=Image.merge("RGB",(r,g,b))

        '''
        测试设备(1080x2400),其他分辨率需调整相应参数
        box(左,上,右,下)像素为单位
        '''
        box=(leftPixel,topPixel,rightPixel,downPixel)
        crop_im=im.crop(box)                                                              #裁剪函数
        im.close()
        crop_im.save(filePath+'\\'+'preOcrImg.jpg')                                       #保存本地

    '''
    屏幕内容采集函数
    '''
    def getPrintScreen(self,filePath,ADBpath):
        subprocess.call(ADBpath+"shell /system/bin/screencap -p /sdcard/printScreenImg.jpg",stdout=subprocess.PIPE, shell=subprocess.PIPE)
        subprocess.call(ADBpath+"pull /sdcard/printScreenImg.jpg "+filePath+"/printScreenImg.jpg",stdout=subprocess.PIPE, shell=subprocess.PIPE)

    '''
    文件路径设置函数
    '''
    def getfilePath(self,filePath):
        filePath=filePath.strip()                                                         #去除首位空格
        filePath=filePath.rstrip("\\")                                                    #去除尾部 \ 符号
        isDirectoryExists=os.path.exists(filePath)                                        #判断路径是否存在
        if not isDirectoryExists:                                                         #判断结果(如果不存在则创建目录,创建目录操作函数)
            os.makedirs(filePath.encode('utf-8'). decode('utf-8'))                        #UTF-8解码,避免磁盘中可能会出现乱码的情况

    '''
    设备信息抓取函数
    '''
    def getDeviceInfo(self):
        print ('------------------------------------------------')
        print ('DEVICE INFO CATCH START !')
        print ()
        print ('ADB SCRIPT CATCH (get-serialno)                :')
        subprocess.call(ADBpath+' get-serialno')                                          #设备序列号
        print ('ADB SCRIPT CATCH (getprop ro.product.model)    :')
        subprocess.call(ADBpath+' shell getprop ro.product.model')                        #设备型号
        print ('ADB SCRIPT CATCH (getprop ro.build.type)       :')
        subprocess.call(ADBpath+' shell getprop ro.build.type')                           #系统版本
        print ('ADB SCRIPT CATCH (wm size)                     :')
        subprocess.call(ADBpath+' shell wm size')                                         #设备分辨率
        print ('ADB SCRIPT CATCH (cat /proc/meminfo)           :')
        subprocess.call(ADBpath+' shell cat /proc/meminfo')                               #当前内存
        print ('ADB SCRIPT CATCH (wm density)                  :')
        subprocess.call(ADBpath+' shell wm density')                                      #物理密度
        print ()
        print ('DEVICE INFO CATCH END !')
        print ('------------------------------------------------')

    '''
    信息推送Wechat函数
    '''
    def sendMessage2Wechat(self,message):
        print ("BREAK_CONDITION('",BREAK_CONDITION,"') TRIGGERED ! PRINTSC_CATCH PROCESS CALL sendMessage2Wechat(self,message) METHOD !")
        url = 'http://sct.ftqq.com/{}.send'.format(global_config.getRaw('messenger', 'sckey'))
        payload = {
            "text":global_config.getRaw('config', 'TIPS'),
            "desp": message
        }
        headers = {
            'User-Agent':global_config.getRaw('config', 'DEFAULT_USER_AGENT')
        }
        requests.get(url, params=payload, headers=headers)

    '''
    决策交易中断条件函数
    '''
    def judgeProcessBreakCondition(self,message,breakCondition):
        if message.find(breakCondition)>=0:
            return True
        else:
            return False

    '''
    设备页面整幅刷新函数
    '''
    def swipeDevice(self):
        start_x=int(global_config.getRaw('config', 'START_X'))
        start_y=int(global_config.getRaw('config', 'START_Y'))
        end_x=int(global_config.getRaw('config', 'END_X'))
        end_y=int(global_config.getRaw('config', 'END_Y'))
        duration_ms=int(global_config.getRaw('config', 'DURATION_MS'))
        subprocess.call(ADBpath+" shell input swipe {} {} {} {} {}".format(start_x, start_y, end_x, end_y, duration_ms))
        time.sleep(int(global_config.getRaw('config', 'THREAD_SLEEP_TIME')))

if __name__ == '__main__':
        HOTKEY_RUN=0x0D                                                                    #屏幕捕捉:Enter
        HOTKEY_EXIT=win32con.VK_F8                                                         #结束程序:F8
        EXIT = False                                                                       #传递退出参数
        RUN = True                                                                         #传递启动参数(True则无需热键)
        ID1=106                                                                            #注册热键的唯一id,用来区分热键
        ID2=105
        LOOP_COUNT=0
        BREAK_CONDITION=global_config.getRaw('config', 'BREAK_CONDITION')

        printScreenCatch=os.path.split(os.path.realpath(__file__))[0]
        filePath=os.path.split(os.path.realpath(__file__))[0]+"\\PrintScreenCatchImg"
        ADBpath=printScreenCatch+'\\'+'ApkInstaller\\adb '
        ans=Ans()                                                                          #创建实例
        '''
        热键检测【目前配置(自动捕捉),无需热键检测】
        hotkey = Hotkey()
        hotkey.start()
        '''
        print ('ANDROID DEVICE USB DEBUG STATUS CONFIRM START !')
        subprocess.call(ADBpath+" wait-for-device",stdout=subprocess.PIPE, shell=subprocess.PIPE)
        print ('ANDROID DEVICE USB DEBUG STATUS CONFIRM END !')
        print ('PRINTSC_CATCH PROCESS SYS_PARAMS. 1、GLOBAL_SCREEN_CATCH:AUTO_CATCH')
        ans.getDeviceInfo()                                                                #设备信息抓取
        print ('PRINTSC_CATCH PROCESS START !')

        while(True):
            try:
                OCR_RECOGNITION_MESSAGE=''                                                 #装载识别出来的文字
                if RUN==True:
                    LOOP_COUNT+=1
                    print ("PRINTSC_CATCH PROCESS CURRENT LOOP CONDITION IS 'TRUE', PROCESS START. LOOP IS :",LOOP_COUNT," !")
                    ans.getfilePath(filePath)
                    start=time.time()
                    ans.getPrintScreen(filePath,ADBpath)
                    OCR_RECOGNITION_MESSAGE=ans.main(OCR_RECOGNITION_MESSAGE)
                    subprocess.call(ADBpath+' shell rm /sdcard/printScreenImg.jpg',stdout=subprocess.PIPE, shell=subprocess.PIPE)
                    end=time.time()
                    booleanFlag=ans.judgeProcessBreakCondition(OCR_RECOGNITION_MESSAGE,BREAK_CONDITION)
                    if booleanFlag:
                        print ("CURRENT OCR_RECOGNITION_MESSAGE IS :'",OCR_RECOGNITION_MESSAGE,"',MATCH THE BREAK_CONDITION('",BREAK_CONDITION,"') !")
                        ans.sendMessage2Wechat(OCR_RECOGNITION_MESSAGE)
                        print ('PRINTSC_CATCH PROCESS END !')
                        break
                    else:
                        ans.swipeDevice()
                        print ("CURRENT OCR_RECOGNITION_MESSAGE IS :'",OCR_RECOGNITION_MESSAGE,"',NOT MATCH THE BREAK_CONDITION('",BREAK_CONDITION,"') !")
                    print ("PRINTSC_CATCH PROCESS CURRENT LOOP CONDITION IS 'TRUE', PROCESS END. LOOP IS :",LOOP_COUNT," ! ","TIME COST :",end-start)
                    print ()
               
                elif EXIT==True:
                    print ('PRINTSC_CATCH PROCESS END !')
                    break
            except Exception as e:
                logger.error('EXCEPTION STACKTRACE:',e)
                break

在这里插入图片描述

4.验证情况

4.1.执行情况

在这里插入图片描述

4.2.命中情况

在这里插入图片描述

在这里插入图片描述

4.3.监听范围

在这里插入图片描述

4.4.消息推送

在这里插入图片描述

4.5.4月9日验证情况

1.为不影响正常用户下单,特选取酒类商品作为线上测试案例。
  • 命中情况在这里插入图片描述
    在这里插入图片描述
    在这里插入图片描述
  • 测试说明
订单仅为测试使用,未影响正常用户下单;
当前订单已做超时取消处理;
订单下单时间与实际测试时间不符因为本人刚刚测试手残没有成功保存了命中瞬间的测试截图,该图为后补,各位知悉。

5.行文说明

5.1.关于个人

1.普通程序员。

5.2.关于项目

1.项目完全开源,所有涉及一方库均可被引用,注明来源即可;
2.请务必不要出于商业化或非法目的使用,国难当头;
3.请把珍贵的资源优先让给更需要的人;
4.请优先协助身边的老人,病患以及特殊人群使用此项目;
5.如果你抢购成功了,请尽可能帮助你身边的人。

5.3.写在最后

1.SkyKai大神使用Kotlin实现Application级别插件(DingDongHelper),并开源(https://github.com/Skykai521/DingDongHelper)。在此抱拳;
2.我曾经自学过两周时间的Python语言,DingDongPrintScCatch(V1.1)虽不能完全傻瓜式使用,但,在此抛砖引玉了;
3.住处疫情防控整体不容乐观,之前的志愿者工作暂缓。工作性质决定了能做的贡献仅限于此;
4.希望世界和平;
5.希望疫情早日结束!
Logo

华为开发者空间,是为全球开发者打造的专属开发空间,汇聚了华为优质开发资源及工具,致力于让每一位开发者拥有一台云主机,基于华为根生态开发、创新。

更多推荐