使用pytdx获取股票信息总结

行情接口

pytdx中提供了hq(标准行情)及exhq(扩展市场行情)两种接口,扩展市场目前已经失效无法使用。

标准行情

这里只提供代码,见附录。

对接总结

除了一开始不熟悉股票领域的相关定义,目前是遇到一个概念去查询一个概念,这个只有慢慢来了。

界面展示

目前采用odoo框架来存储和展示获取下来的相关信息。
体验地址:http://111.229.103.209:8090/
用户名:john,密码:123456
在这里插入图片描述

性能问题

目前是获取了深市和沪市的全部5300+股票的所有数据,历史交易数据打算只获取9月1号以后的。后续真实的使用,肯定是选定部分股票去获取数据。

数据可靠性

数据获取太频繁的时候,会遇到timeout,于是统一都增加了重试机制

附录代码

# -*- coding: utf-8 -*-
import time
from pytdx.hq import TdxHq_API


class Tdx_Client():

    def __init__(self, ip='111.229.247.189', port='7709'):
        self.ip = ip
        self.port = port
        self.api = TdxHq_API(auto_retry=True, raise_exception=False)
        self.api.connect(self.ip, int(self.port))

    def get_stock_quotes(self, all_stock, code=None):
        """
        获取股票行情
        """
        for retry in range(3):
            try:
                if self.api.connect(self.ip, int(self.port)):
                    data = self.api.get_security_quotes(all_stock, code)
                    return data or []
            except Exception as e:
                time.sleep(3)
        return []

    def get_stock_bars(self, mkt_id, code, start, size):
        """
        获取k线
        """
        for retry in range(3):
            try:
                if self.api.connect(self.ip, int(self.port)):
                    data = self.api.get_security_bars(mkt_id, code, start, size)
                    return data
            except Exception as e:
                time.sleep(3)
        return None

    def get_stock_count(self, mkt_id):
        """
        获取股票数量
        """
        for retry in range(3):
            try:
                if self.api.connect(self.ip, int(self.port)):
                    data = self.api.get_security_count(mkt_id)
                    return data
            except Exception as e:
                time.sleep(3)
        return None

    def get_stock_list(self, mkt_id, start):
        """
        获取股票列表
        """
        for retry in range(3):
            try:
                if self.api.connect(self.ip, int(self.port)):
                    data = self.api.get_security_list(mkt_id, start)
                    return data
            except Exception as e:
                time.sleep(3)
        return None

    def get_index_bars(self, category, market, code, start, count):
        """
        获取指数k线
        n数据类型:
        0 5分钟K线
        1 15分钟K线
        2 30分钟K线
        3 1小时K线
        4 日K线
        7 1分钟
        8 1分钟K线
        """
        for retry in range(3):
            try:
                if self.api.connect(self.ip, int(self.port)):
                    data = self.api.get_index_bars(category, market, code, start, count)
                    return data
            except Exception as e:
                time.sleep(3)
        return None

    def get_minute_time_data(self, mkt_id, code):
        """
        查询分时行情
        """
        for retry in range(3):
            try:
                if self.api.connect(self.ip, int(self.port)):
                    data = self.api.get_minute_time_data(mkt_id, code)
                    return data
            except Exception as e:
                time.sleep(3)
        return None

    def get_history_minute_time_data(self, mkt_id, code, date):
        """
        查询历史分时行情
        """
        if isinstance(date, str):
            date = int(date)
        else:
            date = int(date.strftime("%Y%m%d"))
        for retry in range(3):
            try:
                if self.api.connect(self.ip, int(self.port)):
                    data = self.api.get_history_minute_time_data(
                        mkt_id, code, date)
                    return data
            except Exception as e:
                time.sleep(3)
        return None

    def get_transaction_data(self, market, code, start, count):
        """
        查询分时成交
        """
        for retry in range(3):
            try:
                if self.api.connect(self.ip, int(self.port)):
                    data = self.api.get_transaction_data(market, code, start, count)
                    return data
            except Exception as e:
                time.sleep(3)
        return None

    def get_history_transaction_data(self, market, code, start, count, date):
        """
        查询历史分时成交
        """
        if isinstance(date, str):
            date = int(date)
        else:
            date = int(date.strftime("%Y%m%d"))
        for retry in range(3):
            try:
                if self.api.connect(self.ip, int(self.port)):
                    data = self.api.get_history_transaction_data(
                        market, code, start, count, date)
                    return data
            except Exception as e:
                time.sleep(3)
        return None

    def get_company_info_category(self, market, code):
        """
        查询公司信息目录
        """
        for retry in range(3):
            try:
                if self.api.connect(self.ip, int(self.port)):
                    data = self.api.get_company_info_category(market, code)
                    return data
            except Exception as e:
                time.sleep(3)
        return None

    def get_company_info_content(self, market, code, filename, start, length):
        """
        读取公司信息-最新提示
        """
        for retry in range(3):
            try:
                if self.api.connect(self.ip, int(self.port)):
                    data = self.api.get_company_info_content(market, code, filename, start, length)
                    return data
            except Exception as e:
                time.sleep(3)
        return None

    def get_xdxr_info(self, market, code):
        """
        读取除权除息
        """
        for retry in range(3):
            try:
                if self.api.connect(self.ip, int(self.port)):
                    data = self.api.get_xdxr_info(market, code)
                    return data
            except Exception as e:
                time.sleep(3)
        return None

    def get_finance_info(self, market, code):
        """
        获取财务信息
        """
        for retry in range(3):
            try:
                if self.api.connect(self.ip, int(self.port)):
                    data = self.api.get_finance_info(market, code)
                    return data
            except Exception as e:
                time.sleep(3)
        return {}

    def get_k_data(self, code, start, end):
        """
        日线级别k线获取函数
        """
        for retry in range(3):
            try:
                if self.api.connect(self.ip, int(self.port)):
                    df = self.api.get_k_data(code, start, end)
                    return df.to_dict('records') or []
            except Exception as e:
                time.sleep(3)
        return []

    def get_and_parse_block_info(self, blockfile):
        """
        获取板块信息
        """
        for retry in range(3):
            try:
                if self.api.connect(self.ip, int(self.port)):
                    data = self.api.get_and_parse_block_info(blockfile)
                    return data
            except Exception as e:
                time.sleep(3)
        return None
评论
添加红包

请填写红包祝福语或标题

红包个数最小为10个

红包金额最低5元

当前余额3.43前往充值 >
需支付:10.00
成就一亿技术人!
领取后你会自动成为博主和红包主的粉丝 规则
hope_wisdom
发出的红包
实付
使用余额支付
点击重新获取
扫码支付
钱包余额 0

抵扣说明:

1.余额是钱包充值的虚拟货币,按照1:1的比例进行支付金额的抵扣。
2.余额无法直接购买下载,可以购买VIP、付费专栏及课程。

余额充值