//lark_bot_server.py#!/usr/bin/envpython#--coding:utf-8--fromhttp.serverimportBaseHTTPRequestHandler,HTTPServerfromosimportpathimportjsonfromurllibimportrequest,parseAPP_ID="abcdefg"APP_SECRET="hijklmn"APP_VERIFICATION_TOKEN="opqrst"classRequestHandler:defdo_GET:self.send_responseself.send_headerself.end_headersself.wfile.writedefdo_POST:#解析请求bodyreq_body=self.rfile.read)obj=json.loads)#校验verificationtoken是否匹配,token不匹配说明该回调并非来自开发平台token=obj.getiftoken!=APP_VERIFICATION_TOKEN:printself.responsereturn#根据type处理不同类型事件type=obj.getif"url_verification"==type:#验证请求URL是否有效self.handle_request_url_verifyelif"event_callback"==type:#事件回调#获取事件内容和类型,并进行相应处理,此处只关注给机器人推送的消息事件event=obj.getifevent.get=="message":self.handle_messagereturnreturndefhandle_request_url_verify:#原样返回challenge字段内容challenge=post_obj.getrsp={"challenge":challenge}self.response)returndefhandle_message:#此处只处理text类型消息,其他类型消息忽略msg_type=event.getifmsg_type!="text":printself.responsereturn#调用发消息API之前,先要获取API调用凭证:tenant_access_tokenaccess_token=self.get_tenant_access_tokenifaccess_token=="":self.responsereturn#机器人echo收到的消息self.send_message,event.get)self.responsereturndefresponse:self.send_responseself.send_headerself.end_headersself.wfile.write)defget_tenant_access_token:url="https://open.feishu.cn/open-apis/auth/v3/tenant_access_token/internal/"headers={"Content-Type":"application/json"}req_body={"app_id":APP_ID,"app_secret":APP_SECRET}data=bytes,encoding="utf8")req=request.Requesttry:response=request.urlopenexceptExceptionase:print.decode)return""rsp_body=response.read.decodersp_dict=json.loadscode=rsp_dict.getifcode!=0:printreturn""returnrsp_dict.getdefsend_message:url="https://open.feishu.cn/open-apis/message/v4/send/"headers={"Content-Type":"application/json","Authorization":"Bearer"+token}req_body={"open_id":open_id,"msg_type":"text","content":{"text":text}}data=bytes,encoding="utf8")req=request.Requesttry:response=request.urlopenexceptExceptionase:print.decode)returnrsp_body=response.read.decodersp_dict=json.loadscode=rsp_dict.getifcode!=0:print)defrun:port=3500server_address=httpd=HTTPServerprinthttpd.serve_foreverif__name__=="__main__":run//lark_bot_cli.pyimportjsonimporthttp.clientif__name__=="__main__":#要发送的数据data={"type":"url_verification","challenge":"tqz_for_test","token":"..."}#本地服务器的IP地址和端口号server_address=#发送POST请求到/path,注意这里的路径需要匹配到RequestHandler中的URL路由规则conn=http.client.HTTPConnectionconn.request,headers={"Content-type":"application/json"})response=conn.getresponseifresponse.status==200:#请求成功,处理返回的数据response_data=response.readprint)else:#请求失败printconn.close
文章为作者独立观点,不代表 股票程序化软件自动交易接口观点