注册
登录
 文档中心 产品介绍 开发指南 API接口 代码样例 使用帮助
Python3使用websocket调用Http代理IP的代码样例
站大爷 官方 2024-01-19 2250 浏览

温馨提示:

1.  安装运行所需的客户端: pip install websocket-client

2.  使用HTTP代理发送websocket请求

3.  在IP可用的情况下,客户端长时间不发送消息,服务端会断开连接

4.  运行环境要求 python3.x

5.  代码样例中的代理IP和端口均为虚构,请通过站大爷的API提取链接获取代理IP使用,避免报错


import gzip
import zlib
import websocket

OPCODE_DATA = (websocket.ABNF.OPCODE_TEXT, websocket.ABNF.OPCODE_BINARY)
url = "ws://echo.websocket.events/"
proxies = {
	"http_proxy_host": "168.38.24.125",
	"http_proxy_port": 16898,
	"http_proxy_auth": ("username", "password"),
}
ws = websocket.create_connection(url, **proxies)

def recv():
	try:
		frame = ws.recv_frame()
	except websocket.WebSocketException:
		return websocket.ABNF.OPCODE_CLOSE, None
	if not frame:
		raise websocket.WebSocketException("Not a valid frame %s" % frame)
	elif frame.opcode in OPCODE_DATA:
		return frame.opcode, frame.data
	elif frame.opcode == websocket.ABNF.OPCODE_CLOSE:
		ws.send_close()
		return frame.opcode, None
	elif frame.opcode == websocket.ABNF.OPCODE_PING:
		ws.pong(frame.data)
		return frame.opcode, frame.data

	return frame.opcode, frame.data

def recv_ws():
	opcode, data = recv()
	if opcode == websocket.ABNF.OPCODE_CLOSE:
		return
	if opcode == websocket.ABNF.OPCODE_TEXT and isinstance(data, bytes):
		data = str(data, "utf-8")
	if isinstance(data, bytes) and len(data) > 2 and data[:2] == b'\037\213':  # gzip magick
		try:
			data = "[gzip] " + str(gzip.decompress(data), "utf-8")
		except Exception:
			pass
	elif isinstance(data, bytes):
		try:
			data = "[zlib] " + str(zlib.decompress(data, -zlib.MAX_WBITS), "utf-8")
		except Exception:
			pass
	if isinstance(data, bytes):
		data = repr(data)

	print("< " + data)

def main():
	print("Press Ctrl+C to quit")
	while True:
		message = input("> ")
		ws.send(message)
		recv_ws()

if __name__ == "__main__":
	try:
		main()
	except KeyboardInterrupt:
		print('\nbye')
	except Exception as e:
		print(e)


立即注册站大爷用户,免费试用全部产品
立即注册站大爷用户,免费试用全部产品