跳转到内容
彼岸论坛
欢迎抵达彼岸 彼岸花开 此处谁在 -彼岸论坛

[Python] 用 Python 接入实时行情接口


已推荐帖子

发表于

请求 K 线数据

import time
import requests
import json
 
# Extra headers
test_headers = {
    'Content-Type':'application/json'
}
 
'''
github: https://github.com/alltick/realtime-forex-crypto-stock-tick-finance-websocket-api
申请免费 token: https://alltick.co/register
官网: https://alltick.co
将如下 JSON 进行 url 的 encode ,复制到 http 的查询字符串的 query 字段里
{"trace":"python_http_test1","data":{"code":"AAPL.US","kline_type":1,"kline_timestamp_end":0,"query_kline_num":2,"adjust_type":0}}
'''
test_url1 = 'https://quote.tradeswitcher.com/quote-stock-b-api/kline?token=e945d7d9-9e6e-4721-922a-7251a9d311d0-1678159756806&query=%7B%22trace%22%3A%22python_http_test1%22%2C%22data%22%3A%7B%22code%22%3A%22AAPL.US%22%2C%22kline_type%22%3A1%2C%22kline_timestamp_end%22%3A0%2C%22query_kline_num%22%3A2%2C%22adjust_type%22%3A0%7D%7D'
 
resp1 = requests.get(url=test_url1, headers=test_headers)
 
# Decoded text returned by the request
text1 = resp1.text
print(text1)

上面代码中是以查询苹果股票(AAPL)分钟 K 线为例子的,如果想查询其它类型的 K 线数据则 kline_type 传入以下值:1-分钟 K ,2-为 5 分钟 K ,3-为 15 分钟 K ,4-为 30 分钟 K ,5-为小时 K ,6-为 2 小时 K ,7-为 4 小时 K ,8-为日 K ,9-为周 K ,10-为月 K 。

请求最新报价成交数据

import time
import requests
import json
 
# Extra headers
test_headers = {
    'Content-Type':'application/json'
}
 
'''
github: https://github.com/alltick/realtime-forex-crypto-stock-tick-finance-websocket-api
申请免费 token: https://alltick.co/register
官网: https://alltick.co
将如下 JSON 进行 url 的 encode ,复制到 http 的查询字符串的 query 字段里
{"trace":"python_http_test2","data":{"symbol_list":[{"code": "700.HK"},{"code": "UNH.US"},{"code": "600416.SH"}]}}
'''
test_url1 = 'https://quote.tradeswitcher.com/quote-stock-b-api/trade-tick?token=e945d7d9-9e6e-4721-922a-7251a9d311d0-1678159756806&query=%7B%22trace%22%3A%22python_http_test2%22%2C%22data%22%3A%7B%22symbol_list%22%3A%5B%7B%22code%22%3A%20%22700.HK%22%7D%2C%7B%22code%22%3A%20%22UNH.US%22%7D%2C%7B%22code%22%3A%20%22600416.SH%22%7D%5D%7D%7D'
 
resp1 = requests.get(url=test_url1, headers=test_headers)
 
# Decoded text returned by the request
text1 = resp1.text
print(text1)

上面代码中 symbol_list 是可以同时传入多个的,分别传入不同的市场的产品也是可以的。

获取最新盘口报价数据

import time
import requests
import json
 
# Extra headers
test_headers = {
    'Content-Type':'application/json'
}
 
'''
github: https://github.com/alltick/realtime-forex-crypto-stock-tick-finance-websocket-api
申请免费 token: https://alltick.co/register
官网: https://alltick.co
将如下 JSON 进行 url 的 encode ,复制到 http 的查询字符串的 query 字段里
{"trace":"python_http_test2","data":{"symbol_list":[{"code": "700.HK"},{"code": "UNH.US"},{"code": "600416.SH"}]}}
'''
test_url1 = 'https://quote.tradeswitcher.com/quote-stock-b-api/depth-tick?token=e945d7d9-9e6e-4721-922a-7251a9d311d0-1678159756806&query=%7B%22trace%22%3A%22python_http_test2%22%2C%22data%22%3A%7B%22symbol_list%22%3A%5B%7B%22code%22%3A%20%22700.HK%22%7D%2C%7B%22code%22%3A%20%22UNH.US%22%7D%2C%7B%22code%22%3A%20%22600416.SH%22%7D%5D%7D%7D'
 
resp1 = requests.get(url=test_url1, headers=test_headers)
 
# Decoded text returned by the request
text1 = resp1.text
print(text1)

同上,symbol_list 支持传入多个值。

通过 Websocket 订阅实时行情数据

import json
import websocket    # pip install websocket-client
 
'''
github: https://github.com/alltick/realtime-forex-crypto-stock-tick-finance-websocket-api
申请免费 token: https://alltick.co/register
官网: https://alltick.co
'''
 
class Feed(object):
 
    def __init__(self):
        self.url = 'wss://quote.tradeswitcher.com/quote-stock-b-ws-api?token=e945d7d9-9e6e-4721-922a-7251a9d311d0-1678159756806'  # 这里输入 websocket 的 url
        self.ws = None
 
    def on_open(self, ws):
        """
        Callback object which is called at opening websocket.
        1 argument:
        @ ws: the WebSocketApp object
        """
        print('A new WebSocketApp is opened!')
 
        # 开始订阅(举个例子)
        sub_param = {
            "cmd_id": 22002, 
            "seq_id": 123,
            "trace":"3baaa938-f92c-4a74-a228-fd49d5e2f8bc-1678419657806",
            "data":{
                "symbol_list":[
                    {
                        "code": "700.HK",
                        "depth_level": 5,
                    },
                    {
                        "code": "UNH.US",
                        "depth_level": 5,
                    },
                    {
                        "code": "600416.SH",
                        "depth_level": 5,
                    }
                ]
            }
        }
        
        #如果希望长时间运行,除了需要发送订阅之外,还需要修改代码,定时发送心跳,避免连接断开,具体查看接口文档
        sub_str = json.dumps(sub_param)
        ws.send(sub_str)
        print("depth quote are subscribed!")
 
    def on_data(self, ws, string, type, continue_flag):
        """
        4 argument.
        The 1st argument is this class object.
        The 2nd argument is utf-8 string which we get from the server.
        The 3rd argument is data type. ABNF.OPCODE_TEXT or ABNF.OPCODE_BINARY will be came.
        The 4th argument is continue flag. If 0, the data continue
        """
 
    def on_message(self, ws, message):
        """
        Callback object which is called when received data.
        2 arguments:
        @ ws: the WebSocketApp object
        @ message: utf-8 data received from the server
        """
        # 对收到的 message 进行解析
        result = eval(message)
        print(result)
 
    def on_error(self, ws, error):
        """
        Callback object which is called when got an error.
        2 arguments:
        @ ws: the WebSocketApp object
        @ error: exception object
        """
        print(error)
 
    def on_close(self, ws, close_status_code, close_msg):
        """
        Callback object which is called when the connection is closed.
        2 arguments:
        @ ws: the WebSocketApp object
        @ close_status_code
        @ close_msg
        """
        print('The connection is closed!')
 
    def start(self):
        self.ws = websocket.WebSocketApp(
            self.url,
            on_open=self.on_open,
            on_message=self.on_message,
            on_data=self.on_data,
            on_error=self.on_error,
            on_close=self.on_close,
        )
        self.ws.run_forever()
 
 
if __name__ == "__main__":
    feed = Feed()
    feed.start()

上面的代码中 symbol_list 代表你要订阅的产品列表,可以同时传入多个市场的多个产品,cmd_id=22002 是订阅盘口数据,当 cmd_id=22004 时订阅的是成交报价,一旦订阅成功,实时股票行情数据就会源源不断的推送过来,并且是及时的。

  • 游客注册

    游客注册

  • 会员

  • 最新的状态更新

    没有最新的状态更新
  • 最近查看

    • 没有会员查看此页面.
×
×
  • 创建新的...