首页
学习
活动
专区
圈层
工具
发布
首页
学习
活动
专区
圈层
工具
MCP广场
社区首页 >问答首页 >如何使用OKEx SHA256加密和Base64编码用websockets登录Base64 API V5?

如何使用OKEx SHA256加密和Base64编码用websockets登录Base64 API V5?
EN

Stack Overflow用户
提问于 2022-04-14 15:36:18
回答 1查看 1.1K关注 0票数 1

虽然使用REST的OKEx API的早期版本已经回答了这个问题,但是使用websockets的最新版本5没有回答这个问题。医生来了

我得到了下面的错误{"event":"error","msg":"Invalid sign","code":"60007"},所以签名字符串算法肯定有问题,但我似乎无法确定我在哪里出错。

代码语言:javascript
运行
复制
import hmac
import json
import time
import hashlib
import asyncio
import websockets

passphrase = "XXXX"
secret_key = b"XXXX"
api_key = "XXXX"

timestamp = int(time.time())
print("timestamp: " + str(timestamp))
sign = str(timestamp) + 'GET' + '/users/self/verify'
total_params = bytes(sign, encoding= 'utf-8')
signature = hmac.new(bytes(secret_key, encoding= 'utf-8'), total_params, digestmod=hashlib.sha256).digest()
signature = base64.b64encode(signature)
print("signature = {0}".format(signature))

async def main():
    msg = \
    {
      "op": "login",
      "args": [
        {
          "apiKey": f'{api_key}',
          "passphrase": f'{passphrase}',
          "timestamp": f'{timestamp}',
          "sign": f'{signature}'
        }
      ]
    }

    async with websockets.connect('wss://wspap.okx.com:8443/ws/v5/private?brokerId=9999') as websocket:
        print(msg)
        await websocket.send(json.dumps(msg))
        response = await websocket.recv()
        print(response)

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())
EN

回答 1

Stack Overflow用户

回答已采纳

发布于 2022-04-14 19:00:14

我想通了。在发送签名之前,需要将其从字节字符串varible转换回字符串变量。

添加以下代码行可执行以下操作:signature = str(signature, 'utf-8')

代码语言:javascript
运行
复制
import json
import time
import hmac
import hashlib
import base64
import asyncio
import websockets

passphrase = "XXXX"
secret_key = "XXXX"
api_key = "XXXX"

timestamp = int(time.time())
print("timestamp: " + str(timestamp))
sign = str(timestamp) + 'GET' + '/users/self/verify'
total_params = bytes(sign, encoding= 'utf-8')
signature = hmac.new(bytes(secret_key, encoding= 'utf-8'), total_params, digestmod=hashlib.sha256).digest()
signature = base64.b64encode(signature)
signature = str(signature, 'utf-8')

print("signature = {0}".format(signature))

async def main():
    msg = \
    {
      "op": "login",
      "args": [
        {
          "apiKey": f'{api_key}',
          "passphrase": f'{passphrase}',
          "timestamp": f'{timestamp}',
          "sign": f'{signature}'
        }
      ]
    }

    async with websockets.connect('wss://wspap.okx.com:8443/ws/v5/private?brokerId=9999') as websocket:
        print(msg)
        await websocket.send(json.dumps(msg))
        response = await websocket.recv()
        print(response)

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())
票数 0
EN
页面原文内容由Stack Overflow提供。腾讯云小微IT领域专用引擎提供翻译支持
原文链接:

https://stackoverflow.com/questions/71874196

复制
相关文章

相似问题

领券
问题归档专栏文章快讯文章归档关键词归档开发者手册归档开发者手册 Section 归档