创建云函数,把代码放入函数容器内运行。公众号内对接API,即可调用函数。

直接看代码:
#coding: UTF-8
#引入相关模块
import time, xmltodict, socket, json, requests

接受微信发来的参数
def main_handler(event, context):
    print(event['httpMethod'])
    print(context)
#判断为GET请求,一般是微信认证信息,我这里就不验证加密了,直接返回约定好的参数就行了。
    if event['httpMethod'] == 'GET':
        return {
        "isBase64Encoded": False,
        "statusCode": 200,
        "headers": {"Content-Type": "plain/text"},
        "body": event['queryString']['echostr']
        }
#port请求一般为消息或数据,判断下属于哪种类型,然后做相关操作就行了。
    if event['httpMethod'] == 'POST':
        webData = event['body']
        xml_dict = xmltodict.parse(webData)
        xml_dict = xml_dict.get("xml")  
        wx_id = xml_dict.get("FromUserName")
        msg_type = xml_dict.get('MsgType')
        content = xml_dict.get("Content")
        if msg_type == "text":
            #表示收到的是文本消息
              pass #做点什么?
        #return  返回给用户的信息 
        else:
    #构造返回值,经由微信服务器回复给用户的消息
            return wx_text(xml_dict,"请输入文字消息")

def wx_text(list_1,str_1):
#处理返回消息,详情参见公众号开发文档
    xml_dict = list_1
    resp_dict = {
        "xml":{
            "ToUserName": xml_dict.get("FromUserName"),
            "FromUserName": xml_dict.get("ToUserName"),
            "CreateTime": int(time.time()),
            "MsgType": "text",
            "Content": str_1
            }
        }  
    return {
    "isBase64Encoded": False,
    "statusCode": 200,
    "headers": {"Content-Type": "application/xml"},
    "body": xmltodict.unparse(resp_dict)
    }

实例:

原因:

工作原因,维护需要经常用到电脑。手机远程控制电脑操作,我已经用了几年了,以前都是电脑开着,从来都不关机。奔着 低碳出行,保护环境的原则(穷,实际我想省点电)然后就有了该项目。

功能:

手机远程开机,然后远程操作电脑。维护相关设备。

开始吧

远程启动我的电脑,其中涉及到东西较多,可以自己科普一下。

小米路由M3G一台,刷的是老毛子固件。其中用到wol唤醒功能,frp内网穿透。网上有各种免费的服务器(提供公网)

电脑开启网卡唤醒功能。魔术包等等。。。 具体自己百度

准备就绪后直接云函数直接上代码:

#coding: UTF-8
import time, xmltodict, socket, json, requests

def kj():
#GET请求唤醒电脑x
    headers = {'Host': 'x.xxxx.vip:880',
    'Connection': 'keep-alive',
    'Authorization': 'Basic cm9vdDphZG1pbkAxMTMz',
    'Accept': 'application/json, text/javascript, */*; q=0.01',
    'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/96.0.4664.45 Safari/537.36',
    'X-Requested-With': 'XMLHttpRequest',
    'Referer': 'http://m.ctou.vip:880/Advanced_WOL_Content.asp',
    'Accept-Encoding': 'gzip, deflate',
    'Accept-Language': 'zh-CN,zh;q=0.9'}
    try:   
        res = requests.get("http://m.ctou.vip:880/wol_action.asp?dstmac=E4%3A54%3AE8%3AA1%3AAB%3A32",headers=headers).json().get("wol_mac")
    except:
        return "设备不在线"
    return "设备地址(%s),系统正在启动中请稍后。。。"%res
#返回开机设备mac地址

def main_handler(event, context):
    print(event['httpMethod'])
    print(context)
    if event['httpMethod'] == 'GET':
        return {
        "isBase64Encoded": False,
        "statusCode": 200,
        "headers": {"Content-Type": "plain/text"},
        "body": event['queryString']['echostr']
        }

    if event['httpMethod'] == 'POST':
        webData = event['body']
        xml_dict = xmltodict.parse(webData)
        xml_dict = xml_dict.get("xml")  
        wx_id = xml_dict.get("FromUserName")
        msg_type = xml_dict.get('MsgType')
        content = xml_dict.get("Content")
        if msg_type == "text":
            #表示收到的是文本消息
            if content[0] == "@":
                li = [wx_id,content[1:]]
                return wx_text(xml_dict,client_sf(s1,li)[1])
                #return wx_text(xml_dict,"您未绑定服务器,请先联系管理员绑定ID:%s"%wx_id)
               # khd(s1,[wx_id,content[1:]])
            elif content == "开机":
                return wx_text(xml_dict,kj())
            #构造返回值,经由微信服务器回复给用户的消息
            else:
                return wx_text(xml_dict,content)
        else:
            return wx_text(xml_dict,"请输入文字消息")

def wx_text(list_1,str_1):
    xml_dict = list_1
    resp_dict = {
        "xml":{
            "ToUserName": xml_dict.get("FromUserName"),
            "FromUserName": xml_dict.get("ToUserName"),
            "CreateTime": int(time.time()),
            "MsgType": "text",
            "Content": str_1
            }
        }  
    return {
    "isBase64Encoded": False,
    "statusCode": 200,
    "headers": {"Content-Type": "application/xml"},
    "body": xmltodict.unparse(resp_dict)
    }
后续:

当然上面这些只是开始,后续我们还可以增加你想要的功能。也可带入以上代码内使用。

客户端:
def client_sf(host,li):
    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    sock.connect((host[1], host[2]))
    js = json.dumps(li).encode("utf-8")   
    arrBuf = bytearray(b'\xff\xaa\xff\xaa')   
    picSize = len(js) 
    datalen = 64 + picSize
    arrBuf += bytearray(datalen.to_bytes(4, byteorder='little'))
    guid = 23458283482894382928948
    arrBuf += bytearray(guid.to_bytes(64, byteorder='little'))
    arrBuf += js
    sock.sendall(arrBuf)  
    while True:
        try:
            str1 = sock.recv(8)
            if not str1:break
            data = bytearray(str1)
            headIndex = data.find(b'\xff\xaa\xff\xaa')
  
            if headIndex == 0:
                allLen = int.from_bytes(data[headIndex+4:headIndex+8], byteorder='little')
                curSize = 0
                allData = b''
                while curSize < allLen:
                    data = sock.recv(1024)
                    allData += data
                    curSize += len(data)
                arrGuid = allData[0:64]
                tail = arrGuid.find(b'\x00')
                arrGuid = arrGuid[0:tail]
                Data = allData[64:]
                a = json.loads(Data.decode("utf-8"))
                return a   
        except Exception as e:
            print(e)
            break
    sock.close()
服务端代码:

加上frp内网穿透。后面的靠自己想象了。。。

# coding=utf-8
import socketserver, json

class MyHandler(socketserver.BaseRequestHandler):
  
    def handle(self):
        ip,port =self.client_address
        while True:
            try:
                str1 = self.request.recv(8)
                data = bytearray(str1)
                headIndex = data.find(b'\xff\xaa\xff\xaa')
      
                if headIndex == 0:
                    allLen = int.from_bytes(data[headIndex+4:headIndex+8], byteorder='little')
                    curSize = 0
                    allData = b''
                    while curSize < allLen:
                        data = self.request.recv(1024)
                        allData += data
                        curSize += len(data)
                    arrGuid = allData[0:64]
                    #去除guid末尾的0
                    tail = arrGuid.find(b'\x00')
                    arrGuid = arrGuid[0:tail]
                    Data = allData[64:]
                    a = json.loads(Data.decode("utf-8"))
                    print(a)
                    ###############一次性发送数据包###############
                    js = json.dumps(a).encode("utf-8")
                    #包头标志
                    arrBuf = bytearray(b'\xff\xaa\xff\xaa')   
                    #data大小
                    picSize = len(js) 
                    #数据体长度 = guid大小(固定) + data大小
                    datalen = 64 + picSize
                    #组合数据包
                    arrBuf += bytearray(datalen.to_bytes(4, byteorder='little'))
                    guid = 23458283482894382928948
                    arrBuf += bytearray(guid.to_bytes(64, byteorder='little'))
                    arrBuf += js
                    self.request.sendall(arrBuf)
                    ############################################
                    return a
            except Exception as e:
                print(e)
                break
        self.request.close()
        print("IP:%s 端口:%s 用户断开连接"%(ip,port))
        #MyHandler.f(a)
  
  
        #################################################
  
if __name__ == '__main__':
    server = socketserver.ThreadingTCPServer(("0.0.0.0", 8088), MyHandler, bind_and_activate=True)
  
    print('服务已启动')
    server.serve_forever()
最后修改:2021 年 12 月 14 日
如果觉得我的文章对你有用,请随意赞赏