
BGUSER-NXB07P6L
2025/08/10 07:09
1
# -*- coding: utf-8 -*-
import hmac, base64, hashlib, json, time, asyncio, datetime as dt
import requests, websockets
########################
# ==== 配置:硬编码密钥(在本地替换占位符) ==== #
########################
OKX_API_KEY = "REPLACE_WITH_YOUR_OKX_API_KEY"
OKX_API_SECRET = "REPLACE_WITH_YOUR_OKX_API_SECRET"
OKX_API_PASSPHRASE = "REPLACE_WITH_YOUR_OKX_API_PASSPHRASE"
BG_API_KEY = "REPLACE_WITH_YOUR_BITGET_API_KEY"
BG_API_SECRET = "REPLACE_WITH_YOUR_BITGET_API_SECRET"
BG_PASSPHRASE = "REPLACE_WITH_YOUR_BITGET_PASSPHRASE"
########################
# ==== 交易/策略参数 ==== #
########################
# 标的
OKX_INST_ID = "BTC-USDT-SWAP" # OKX 永续
BG_SYMBOL = "BTCUSDT" # Bitget USDT本位永续
BG_PRODUCT = "USDT-FUTURES" # Bitget 产品线
# 阈值
OPEN_EQUALITY_USD = 2.0 # |价差| <= 2 开仓锁价
CLOSE_SPREAD_USD = 18.0 # |价差| >= 18 平仓
USE_MARK_PRICE = True # True=标记价;False=最新成交价
# 手数/张数(先小量测试)
OKX_SZ = "10"
BG_SIZE = "10"
# 开仓方向(默认 OKX 多 / Bitget 空;可改)
OPEN_OKX_SIDE = "buy" # buy/sell
OPEN_BG_SIDE = "sell" # buy/sell
OKX_TD_MODE = "cross" # cross/isolated
OKX_POS_SIDE = "long" if OPEN_OKX_SIDE=="buy" else "short"
BG_MARGIN_COIN = "USDT"
# REST & WS
OKX_BASE = "https://www.okx.com"
BG_BASE = "https://api.bitget.com"
OKX_WS_PUBLIC = "wss://ws.okx.com:8443/ws/v5/public"
BG_WS_PUBLIC = "wss://ws.bitget.com/v2/ws/public"
# 运行控制
DRY_RUN = False # False=真下单(先小仓验证)
REQUEST_TIMEOUT = 10
WS_RETRY_DELAY = 2
########################
# ==== 工具函数 ==== #
########################
def iso_ts_ms_str():
return str(int(time.time() * 1000))
def okx_headers(method, path, body):
ts = dt.datetime.utcnow().isoformat(timespec="milliseconds") + "Z"
prehash = f"{ts}{method.upper()}{path}{body}"
sign = base64.b64encode(hmac.new(OKX_API_SECRET.encode(), prehash.encode(), hashlib.sha256).digest()).decode()
return {
"OK-ACCESS-KEY": OKX_API_KEY,
"OK-ACCESS-SIGN": sign,
"OK-ACCESS-TIMESTAMP": ts,
"OK-ACCESS-PASSPHRASE": OKX_API_PASSPHRASE,
"Content-Type": "application/json"
}
def bg_sign(ts_ms:str, method:str, path:str, body:str, secret:str)->str:
msg = f"{ts_ms}{method.upper()}{path}{body}".encode()
sign = hmac.new(secret.encode(), msg, hashlib.sha256).digest()
return base64.b64encode(sign).decode()
def bg_headers(method, path, body):
ts = iso_ts_ms_str()
sign = bg_sign(ts, method, path, body, BG_API_SECRET)
return {
"ACCESS-KEY": BG_API_KEY,
"ACCESS-SIGN": sign,
"ACCESS-PASSPHRASE": BG_PASSPHRASE,
"ACCESS-TIMESTAMP": ts,
"Content-Type": "application/json"
}
########################
# ==== 下单/平仓(OKX) ==== #
########################
def okx_place_market(instId, tdMode, side, posSide, sz):
path = "/api/v5/trade/order"
url = OKX_BASE + path
body = {
"instId": instId,
"tdMode": tdMode,
"side": side, # buy/sell
"posSide": posSide, # long/short(对冲)
"ordType": "market",
"sz": str(sz)
}
data = json.dumps(body, separators=(",",":"))
if DRY_RUN:
print("[DRYRUN][OKX] place", data)
return {"dryrun":True, "req":body}
r = requests.post(url, headers=okx_headers("POST", path, data), data=data, timeout=REQUEST_TIMEOUT)
r.raise_for_status()
return r.json()
def okx_close_market(instId, tdMode, posSide, sz):
side = "sell" if posSide=="long" else "buy"
return okx_place_market(instId, tdMode, side, posSide, sz)
########################
# ==== 下单/平仓(Bitget v2) ==== #
########################
def bg_place_market(symbol, productType, marginCoin, side, tradeSide, size):
path = "/api/v2/mix/order/place-order"
url = BG_BASE + path
body = {
"symbol": symbol,
"productType": productType, # USDT-FUTURES
"marginCoin": marginCoin, # USDT
"side": side, # buy/sell
"tradeSide": tradeSide, # open/close
"orderType": "market",
"size": str(size),
"clientOid": f"arb_{int(time.time()*1000)}"
}
data = json.dumps(body, separators=(",",":"))
if DRY_RUN:
print("[DRYRUN][Bitget] place", data)
return {"dryrun":True, "req":body}
r = requests.post(url, headers=bg_headers("POST", path, data), data=data, timeout=REQUEST_TIMEOUT)
r.raise_for_status()
return r.json()
def bg_open(symbol, productType, marginCoin, side, size):
return bg_place_market(symbol, productType, marginCoin, side, "open", size)
def bg_close(symbol, productType, marginCoin, side, size):
opp_side = "buy" if side=="sell" else "sell"
return bg_place_market(symbol, productType, marginCoin, opp_side, "close", size)
########################
# ==== 价格源(WS) ==== #
########################
class PriceFeed:
def __init__(self):
self.okx_price = None
self.bg_price = None
def okx_channel(self):
if USE_MARK_PRICE:
return {"op":"subscribe","args":[{"channel":"mark-price","instId":OKX_INST_ID}]}
else:
return {"op":"subscribe","args":[{"channel":"tickers","instId":OKX_INST_ID}]}
def bg_channel(self):
channel = "markPrice" if USE_MARK_PRICE else "ticker"
return {"op":"subscribe","args":[{"instType":"USDT-FUTURES","channel":channel,"instId":BG_SYMBOL}]}
async def okx_loop(self):
while True:
try:
async with websockets.connect(OKX_WS_PUBLIC, ping_interval=20) as ws:
await ws.send(json.dumps(self.okx_channel()))
async for msg in ws:
data = json.loads(msg)
if data.get("event")=="subscribe":
print("[OKX] subscribed")
elif "data" in data:
d = data["data"][0]
self.okx_price = float(d["markPx"] if USE_MARK_PRICE else d["last"])
except Exception as e:
print("[OKX WS] error:", e)
await asyncio.sleep(WS_RETRY_DELAY)
async def bg_loop(self):
while True:
try:
async with websockets.connect(BG_WS_PUBLIC, ping_interval=20) as ws:
await ws.send(json.dumps(self.bg_channel()))
async for msg in ws:
data = json.loads(msg)
if data.get("event")=="subscribe":
print("[Bitget] subscribed")
elif "arg" in data and "data" in data:
d = data["data"][0]
if USE_MARK_PRICE:
self.bg_price = float(d.get("markPrice") or d.get("price"))
else:
self.bg_price = float(d.get("last") or d.get("price"))
except Exception as e:
print("[Bitget WS] error:", e)
await asyncio.sleep(WS_RETRY_DELAY)
########################
# ==== 策略执行 ==== #
########################
class ArbState:
def __init__(self):
self.has_position = False
self.entry_spread = None
self.okx_side_open = OPEN_OKX_SIDE
self.bg_side_open = OPEN_BG_SIDE
async def main():
feed = PriceFeed()
state = ArbState()
tasks = [asyncio.create_task(feed.okx_loop()),
asyncio.create_task(feed.bg_loop())]
try:
while True:
await asyncio.sleep(0.2)
if feed.okx_price is None or feed.bg_price is None:
continue
spread = feed.okx_price - feed.bg_price # OKX - Bitget
now = dt.datetime.now().strftime("%H:%M:%S")
print(f"{now} P_okx={feed.okx_price:.2f} P_bg={feed.bg_price:.2f} spread={spread:.2f}", end="\r")
# 无持仓 → 锁价开仓
if not state.has_position and abs(spread) <= OPEN_EQUALITY_USD:
print("\n[OPEN] |spread|<=OPEN_EQUALITY_USD,尝试锁价开仓")
try:
okx_res = okx_place_market(OKX_INST_ID, OKX_TD_MODE, state.okx_side_open, OKX_POS_SIDE, OKX_SZ)
bg_res = bg_open(BG_SYMBOL, BG_PRODUCT, BG_MARGIN_COIN, state.bg_side_open, BG_SIZE)
print("[OKX OPEN RES]", okx_res)
print("[BG OPEN RES]", bg_res)
state.has_position = True
state.entry_spread = spread
except Exception as e:
print("[OPEN ERROR]", e)
# 有持仓 → 触发平仓
if state.has_position and abs(spread) >= CLOSE_SPREAD_USD:
print(f"\n[CLOSE] |spread|>=CLOSE_SPREAD_USD,开始同时平仓 | spread={spread:.2f}")
try:
okx_res = okx_close_market(OKX_INST_ID, OKX_TD_MODE, OKX_POS_SIDE, OKX_SZ)
bg_res = bg_close(BG_SYMBOL, BG_PRODUCT, BG_MARGIN_COIN, state.bg_side_open, BG_SIZE)
print("[OKX CLOSE RES]", okx_res)
print("[BG CLOSE RES]", bg_res)
pnl = (spread - (state.entry_spread or 0.0))
print(f"[PNL est] Δspread = {pnl:.2f} USD(未含费率/资金费/滑点)")
state.has_position = False
state.entry_spread = None
except Exception as e:
print("[CLOSE ERROR]", e)
finally:
for t in tasks:
t.cancel()
if __name__ == "__main__":
asyncio.run(main())