跳转至

错误处理指南

bt_api_py 的错误处理和异常管理最佳实践。


异常类型

内置异常类

异常类 继承自 说明
BtApiError Exception 所有 bt_api_py 异常的基类
ExchangeNotFoundError BtApiError 交易所不存在或未初始化
ConnectionError BtApiError 连接错误
RateLimitError BtApiError 请求频率限制
OrderError BtApiError 订单操作错误
AuthenticationError BtApiError 认证失败

订单状态

状态 说明 处理建议
new 订单已接受 等待成交
partially_filled 部分成交 可继续等待或撤销
filled 完全成交 订单完成
canceled 已撤销 订单已取消
rejected 已拒绝 检查订单参数
expired 已过期 重新下单

错误捕获

基础异常捕获

from bt_api_py import BtApi
from bt_api_py.exceptions import BtApiError, ExchangeNotFoundError

try:
    ticker = api.get_tick("BINANCE___SPOT", "BTCUSDT")
    ticker.init_data()
    print(ticker.get_last_price())
except ExchangeNotFoundError as e:
    print(f"交易所不存在: {e}")
except BtApiError as e:
    print(f"API 错误: {e}")
except Exception as e:
    print(f"未知错误: {e}")

精细化错误处理

def safe_api_call(func, **args, **kwargs):
    """安全的 API 调用包装器"""
    try:
        result = func(**args, **kwargs)
        if hasattr(result, 'init_data'):
            result.init_data()
        return result
    except ExchangeNotFoundError:
        print("错误: 交易所未初始化")
        return None
    except ConnectionError:
        print("错误: 连接失败,请检查网络")
        return None
    except RateLimitError:
        print("错误: 请求过于频繁,请稍后重试")
        return None
    except OrderError as e:
        print(f"订单错误: {e}")
        return None
    except Exception as e:
        print(f"未知错误: {e}")
        return None

# 使用

ticker = safe_api_call(api.get_tick, "BINANCE___SPOT", "BTCUSDT")
if ticker:
    print(ticker.get_last_price())

上下文管理器

from contextlib import contextmanager

@contextmanager
def error_handler(api, exchange_name):
    """错误处理上下文管理器"""
    try:
        yield
    except Exception as e:
        api.log(f"操作失败: {e}", level="error")

# 可以在这里添加错误恢复逻辑
        raise

# 使用

with error_handler(api, "BINANCE___SPOT"):
    ticker = api.get_tick("BINANCE___SPOT", "BTCUSDT")
    ticker.init_data()

重试机制

指数退避重试

import time

def retry_api_call(func, max_retries=3, initial_delay=1, backoff_factor=2):
    """带指数退避的重试装饰器"""
    def wrapper(**args, **kwargs):
        delay = initial_delay
        last_exception = None

        for attempt in range(max_retries):
            try:
                return func(**args, **kwargs)
            except (ConnectionError, RateLimitError) as e:
                last_exception = e
                if attempt < max_retries - 1:
                    print(f"请求失败,{delay}秒后重试... (尝试 {attempt + 1}/{max_retries})")
                    time.sleep(delay)
                    delay *= backoff_factor
                else:
                    break
            except Exception:

# 非网络错误不重试
                raise

        raise last_exception

    return wrapper

# 使用

@retry_api_call(max_retries=5)
def get_ticker_with_retry(symbol):
    ticker = api.get_tick("BINANCE___SPOT", symbol)
    ticker.init_data()
    return ticker

条件重试

def retry_until_success(func, condition, max_attempts=10, delay=1):
    """直到满足条件或达到最大尝试次数"""
    for attempt in range(max_attempts):
        try:
            result = func()
            if condition(result):
                return result
            print(f"条件不满足,重试 {attempt + 1}/{max_attempts}")
        except Exception as e:
            print(f"尝试 {attempt + 1} 失败: {e}")

        if attempt < max_attempts - 1:
            time.sleep(delay)

    return None

# 使用

def check_order_filled(order):
    return order.get_order_status() == "filled"

order = retry_until_success(
    lambda: api.query_order("BINANCE___SPOT", "BTCUSDT", "123456"),
    check_order_filled
)

日志记录

配置日志

from bt_api_py.functions.log_message import SpdLogManager

# 创建日志记录器

logger = SpdLogManager(
    file_name='bt_api.log',
    logger_name="api",
    print_info=True
).create_logger()

# 添加日志处理器

import logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)

结构化日志

import logging
import json

class StructuredLogger:
    def __init__(self, name):
        self.logger = logging.getLogger(name)

    def log_api_call(self, method, exchange, symbol, **kwargs):
        """记录 API 调用"""
        self.logger.info(json.dumps({
            "event": "api_call",
            "method": method,
            "exchange": exchange,
            "symbol": symbol,
            "params": kwargs
        }))

    def log_error(self, error, context=None):
        """记录错误"""
        self.logger.error(json.dumps({
            "event": "error",
            "error_type": type(error).__name__,
            "error_message": str(error),
            "context": context
        }))

# 使用

logger = StructuredLogger("bt_api")
try:
    ticker = api.get_tick("BINANCE___SPOT", "BTCUSDT")
except Exception as e:
    logger.log_error(e, context={"exchange": "BINANCE___SPOT", "symbol": "BTCUSDT"})

性能日志

import time
from functools import wraps

def log_performance(func):
    """记录函数执行时间"""
    @wraps(func)
    def wrapper(**args, **kwargs):
        start_time = time.time()
        try:
            result = func(**args, **kwargs)
            elapsed = time.time() - start_time
            api.log(f"{func.__name__} 成功,耗时: {elapsed:.3f}秒")
            return result
        except Exception as e:
            elapsed = time.time() - start_time
            api.log(f"{func.__name__} 失败,耗时: {elapsed:.3f}秒,错误: {e}")
            raise
    return wrapper

# 使用

@log_performance
def get_ticker(symbol):
    return api.get_tick("BINANCE___SPOT", symbol)

常见错误

网络错误

# 错误: ConnectionError / Timeout

# 原因: 网络不稳定或服务器无响应

# 处理: 重试机制

def handle_network_error():
    max_retries = 3
    for attempt in range(max_retries):
        try:
            ticker = api.get_tick("BINANCE___SPOT", "BTCUSDT")
            return ticker
        except ConnectionError:
            if attempt < max_retries - 1:
                time.sleep(2 ** attempt)  # 1, 2, 4 秒
            else:
                raise

认证错误

# 错误: AuthenticationError / 401

# 原因: API Key 错误或过期

# 处理: 检查配置并提示用户

def check_auth_config(api, exchange_name):
    """检查认证配置"""
    try:
        api.get_account(exchange_name)
        print("认证成功")
        return True
    except AuthenticationError:
        print(f"认证失败,请检查 {exchange_name} 的 API Key 配置")
        return False

频率限制

# 错误: RateLimitError / 429

# 原因: 请求过于频繁

# 处理: 添加请求限流

from threading import Lock
import time

class RateLimiter:
    def __init__(self, max_calls, time_window):
        self.max_calls = max_calls
        self.time_window = time_window
        self.calls = []
        self.lock = Lock()

    def acquire(self):
        with self.lock:
            now = time.time()

# 移除时间窗口外的记录
            self.calls = [t for t in self.calls if now - t < self.time_window]

            if len(self.calls) >= self.max_calls:
                sleep_time = self.time_window - (now - self.calls[0])
                if sleep_time > 0:
                    time.sleep(sleep_time)
                    self.calls = []

            self.calls.append(now)

# 使用

limiter = RateLimiter(max_calls=10, time_window=1)  # 每秒最多 10 次

def rate_limited_get_tick(symbol):
    limiter.acquire()
    return api.get_tick("BINANCE___SPOT", symbol)

订单错误

# 错误: OrderError

# 原因: 余额不足、价格偏离等

# 处理: 检查错误类型并处理

def handle_order_error(order_response):
    """处理订单错误"""
    if order_response.get("code") == -2010:  # 余额不足
        print("余额不足,请检查账户")
    elif order_response.get("code") == -2011:  # 订单不存在
        print("订单不存在")
    elif order_response.get("code") == -2019:  # 超过自成交限制
        print("超过自成交限制")
    else:
        print(f"订单错误: {order_response.get('msg')}")

数据初始化错误

# 错误: 数据未初始化

# 原因: 忘记调用 init_data()

# 处理: 封装安全访问

class SafeDataAccess:
    @staticmethod
    def get_price(ticker):
        """安全获取价格"""
        try:
            ticker.init_data()
            return ticker.get_last_price()
        except AttributeError:
            print("警告: 数据未初始化")
            return None

# 使用

ticker = api.get_tick("BINANCE___SPOT", "BTCUSDT")
price = SafeDataAccess.get_price(ticker)

最佳实践

1. 统一异常处理

class ApiErrorHandler:
    """统一 API 错误处理器"""

    def __init__(self, api):
        self.api = api
        self.error_count = {}
        self.max_retries = 3

    def call(self, func, **args, **kwargs):
        """统一的 API 调用入口"""
        exchange = kwargs.get('exchange_name') or args[0] if args else None

        for attempt in range(self.max_retries):
            try:
                result = func(**args, **kwargs)
                if hasattr(result, 'init_data'):
                    result.init_data()

# 成功后重置错误计数
                if exchange:
                    self.error_count[exchange] = 0
                return result
            except ConnectionError as e:
                if attempt < self.max_retries - 1:
                    self.log_and_retry(exchange, e, attempt)
                    time.sleep(2 ** attempt)
                else:
                    self.handle_final_error(exchange, e)
                    raise
            except RateLimitError as e:
                self.handle_rate_limit(exchange, e)
                time.sleep(1)
            except Exception as e:
                self.handle_final_error(exchange, e)
                raise

    def log_and_retry(self, exchange, error, attempt):
        """记录并重试"""
        if exchange:
            self.error_count[exchange] = self.error_count.get(exchange, 0) + 1
        self.api.log(f"错误 (尝试 {attempt + 1}): {error}")

    def handle_rate_limit(self, exchange, error):
        """处理频率限制"""
        self.api.log(f"频率限制: {exchange}, 暂停 10 秒")
        time.sleep(10)

    def handle_final_error(self, exchange, error):
        """处理最终错误"""
        self.api.log(f"操作失败: {exchange}, 错误: {error}", level="error")

# 使用

handler = ApiErrorHandler(api)
ticker = handler.call(api.get_tick, "BINANCE___SPOT", "BTCUSDT")

2. 断路器模式

class CircuitBreaker:
    """断路器:防止连续失败的系统被继续调用"""

    def __init__(self, failure_threshold=5, timeout=60):
        self.failure_threshold = failure_threshold
        self.timeout = timeout
        self.failures = {}
        self.last_failure_time = {}

    def call(self, func, exchange, **args, **kwargs):
        """通过断路器调用函数"""

# 检查断路器状态
        if self.is_open(exchange):
            raise Exception(f"断路器开启: {exchange}")

        try:
            result = func(**args, **kwargs)
            if hasattr(result, 'init_data'):
                result.init_data()
            self.on_success(exchange)
            return result
        except Exception as e:
            self.on_failure(exchange)
            raise

    def is_open(self, exchange):
        """检查断路器是否开启"""
        if exchange not in self.failures:
            return False

        failures = self.failures[exchange]
        last_time = self.last_failure_time.get(exchange, 0)

        if failures >= self.failure_threshold:
            if time.time() - last_time < self.timeout:
                return True
            else:

# 超时后重置
                self.reset(exchange)
                return False
        return False

    def on_success(self, exchange):
        """成功时重置"""
        if exchange in self.failures:
            del self.failures[exchange]

    def on_failure(self, exchange):
        """失败时增加计数"""
        self.failures[exchange] = self.failures.get(exchange, 0) + 1
        self.last_failure_time[exchange] = time.time()

    def reset(self, exchange):
        """重置断路器"""
        if exchange in self.failures:
            del self.failures[exchange]

# 使用

breaker = CircuitBreaker(failure_threshold=5, timeout=60)

def protected_call(symbol):
    return breaker.call(api.get_tick, "BINANCE___SPOT", symbol)

3. 错误通知

class ErrorNotifier:
    """错误通知器"""

    def __init__(self, api):
        self.api = api
        self.errors = []

    def notify(self, error, context=None):
        """发送通知"""
        error_info = {
            "timestamp": time.time(),
            "error": str(error),
            "type": type(error).__name__,
            "context": context
        }
        self.errors.append(error_info)

# 严重错误立即通知
        if isinstance(error, (AuthenticationError, OrderError)):
            self.send_alert(error_info)

    def send_alert(self, error_info):
        """发送告警(可扩展为邮件、钉钉等)"""
        print(f"⚠️ 告警: {error_info}")

完整示例

from bt_api_py import BtApi
from bt_api_py.exceptions import BtApiError, ExchangeNotFoundError, ConnectionError

class SafeBtApi:
    """带完整错误处理的 API 包装类"""

    def __init__(self, exchange_kwargs):
        self.api = BtApi(exchange_kwargs)
        self.circuit_breaker = CircuitBreaker()
        self.error_handler = ApiErrorHandler(self.api)

    def get_tick(self, exchange_name, symbol):
        """安全获取行情"""
        def _call():
            return self.error_handler.call(
                self.api.get_tick, exchange_name, symbol
            )
        return self.circuit_breaker.call(_call, exchange_name)

    def make_order(self, exchange_name, symbol, volume, price, order_type, **kwargs):
        """安全下单"""
        def _call():
            order = self.error_handler.call(
                self.api.make_order,
                exchange_name, symbol, volume, price, order_type,

                - *kwargs

            )

# 验证订单
            if order.get_order_status() == "rejected":
                raise OrderError(f"订单被拒绝: {order.get_order_id()}")
            return order
        return self.circuit_breaker.call(_call, exchange_name)

# 使用

safe_api = SafeBtApi(exchange_kwargs={...})
ticker = safe_api.get_tick("BINANCE___SPOT", "BTCUSDT")

相关文档