Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
import asyncio
import time
from typing import TYPE_CHECKING, Any, Dict, List, Optional

from hummingbot.connector.exchange.wazirx import wazirx_constants as CONSTANTS, wazirx_web_utils as web_utils
from hummingbot.connector.exchange.wazirx.wazirx_order_book import WazirxOrderBook
from hummingbot.core.data_type.order_book_message import OrderBookMessage
from hummingbot.core.data_type.order_book_tracker_data_source import OrderBookTrackerDataSource
from hummingbot.core.web_assistant.connections.data_types import RESTMethod, WSJSONRequest
from hummingbot.core.web_assistant.web_assistants_factory import WebAssistantsFactory
from hummingbot.core.web_assistant.ws_assistant import WSAssistant
from hummingbot.logger import HummingbotLogger

if TYPE_CHECKING:
from hummingbot.connector.exchange.wazirx.wazirx_exchange import WazirxExchange


class WazirxAPIOrderBookDataSource(OrderBookTrackerDataSource):
HEARTBEAT_TIME_INTERVAL = 30.0
ONE_HOUR = 60 * 60

_logger: Optional[HummingbotLogger] = None

def __init__(self,
trading_pairs: List[str],
connector: 'WazirxExchange',
api_factory: WebAssistantsFactory,
domain: str = CONSTANTS.DEFAULT_DOMAIN):
super().__init__(trading_pairs)
self._connector = connector
self._trade_messages_queue_key = CONSTANTS.TRADE_EVENT_TYPE
self._diff_messages_queue_key = CONSTANTS.DIFF_EVENT_TYPE
self._domain = domain
self._api_factory = api_factory

async def get_last_traded_prices(self,
trading_pairs: List[str],
domain: Optional[str] = None) -> Dict[str, float]:
return await self._connector.get_last_traded_prices(trading_pairs=trading_pairs)

async def _request_order_book_snapshot(self, trading_pair: str) -> Dict[str, Any]:
"""
Retrieves a copy of the full order book from the exchange, for a particular trading pair.

:param trading_pair: the trading pair for which the order book will be retrieved

:return: the response from the exchange (JSON dictionary)
"""
params = {
"symbol": await self._connector.exchange_symbol_associated_to_pair(trading_pair=trading_pair),
"limit": "1000"
}

rest_assistant = await self._api_factory.get_rest_assistant()
data = await rest_assistant.execute_request(
url=web_utils.public_rest_url(path_url=CONSTANTS.SNAPSHOT_PATH_URL, domain=self._domain),
params=params,
method=RESTMethod.GET,
throttler_limit_id=CONSTANTS.SNAPSHOT_PATH_URL,
)

return data

async def _subscribe_channels(self, ws: WSAssistant):
"""
Subscribes to the trade events and diff orders events through the provided websocket connection.
:param ws: the websocket assistant used to connect to the exchange
"""
try:
trade_params = []
depth_params = []
for trading_pair in self._trading_pairs:
symbol = await self._connector.exchange_symbol_associated_to_pair(trading_pair=trading_pair)
trade_params.append(f"{symbol.lower()}@trades")
depth_params.append(f"{symbol.lower()}@depth10@100ms")
payload = {
"event": "subscribe",
"streams": trade_params
}
subscribe_trade_request: WSJSONRequest = WSJSONRequest(payload=payload)

payload = {
"event": "subscribe",
"streams": depth_params
}
subscribe_orderbook_request: WSJSONRequest = WSJSONRequest(payload=payload)

await ws.send(subscribe_trade_request)
await ws.send(subscribe_orderbook_request)

self.logger().info("Subscribed to public order book and trade channels...")
except asyncio.CancelledError:
raise
except Exception:
self.logger().error(
"Unexpected error occurred subscribing to order book trading and delta streams...",
exc_info=True
)
raise

async def _connected_websocket_assistant(self) -> WSAssistant:
ws: WSAssistant = await self._api_factory.get_ws_assistant()
await ws.connect(ws_url=CONSTANTS.WSS_URL.format(self._domain),
ping_timeout=CONSTANTS.WS_HEARTBEAT_TIME_INTERVAL)
return ws

async def _order_book_snapshot(self, trading_pair: str) -> OrderBookMessage:
snapshot: Dict[str, Any] = await self._request_order_book_snapshot(trading_pair)
snapshot_timestamp: float = time.time()
snapshot_msg: OrderBookMessage = WazirxOrderBook.snapshot_message_from_exchange(
snapshot,
snapshot_timestamp,
metadata={"trading_pair": trading_pair}
)
return snapshot_msg

async def _parse_trade_message(self, raw_message: Dict[str, Any], message_queue: asyncio.Queue):
if "code" not in raw_message:
trading_pair = await self._connector.trading_pair_associated_to_exchange_symbol(symbol=raw_message["data"]["trades"][0]["s"])
trade_message = WazirxOrderBook.trade_message_from_exchange(
raw_message, {"trading_pair": trading_pair})
message_queue.put_nowait(trade_message)

async def _parse_order_book_diff_message(self, raw_message: Dict[str, Any], message_queue: asyncio.Queue):
if "code" not in raw_message:
trading_pair = await self._connector.trading_pair_associated_to_exchange_symbol(symbol=raw_message["data"]["s"])
order_book_message: OrderBookMessage = WazirxOrderBook.diff_message_from_exchange(
raw_message, time.time(), {"trading_pair": trading_pair})
message_queue.put_nowait(order_book_message)

def _channel_originating_message(self, event_message: Dict[str, Any]) -> str:
channel = ""
if "stream" in event_message:
event_type = event_message["stream"]
if "@depth10@100ms" in event_type:
symbol = event_message["data"]["s"]
elif "trades" in event_type:
symbol = event_message["data"]["trades"][0]["s"]
if event_type == CONSTANTS.TRADE_EVENT_TYPE.format(symbol):
channel = self._trade_messages_queue_key
if event_type == CONSTANTS.DIFF_EVENT_TYPE.format(symbol):
channel = self._diff_messages_queue_key
return channel
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
import asyncio
from typing import TYPE_CHECKING, Any, Dict, Optional

from hummingbot.connector.exchange.wazirx import wazirx_constants as CONSTANTS, wazirx_web_utils as web_utils
from hummingbot.connector.exchange.wazirx.wazirx_auth import WazirxAuth
from hummingbot.core.data_type.user_stream_tracker_data_source import UserStreamTrackerDataSource
from hummingbot.core.web_assistant.connections.data_types import RESTMethod, WSJSONRequest
from hummingbot.core.web_assistant.web_assistants_factory import WebAssistantsFactory
from hummingbot.core.web_assistant.ws_assistant import WSAssistant
from hummingbot.logger import HummingbotLogger

if TYPE_CHECKING:
from hummingbot.connector.exchange.wazirx.wazirx_exchange import WazirxExchange


class WazirxAPIUserStreamDataSource(UserStreamTrackerDataSource):
_logger: Optional[HummingbotLogger] = None

def __init__(self,
auth: WazirxAuth,
connector: 'WazirxExchange',
api_factory: WebAssistantsFactory,
domain: str = CONSTANTS.DEFAULT_DOMAIN):

super().__init__()
self._auth: WazirxAuth = auth
self._api_factory = api_factory
self._connector = connector
self._domain = domain
self._current_auth_token: Optional[str] = None
self._api_factory = api_factory

self._listen_key_initialized_event: asyncio.Event = asyncio.Event()
self._last_listen_key_ping_ts = 0

async def _connected_websocket_assistant(self) -> WSAssistant:
ws: WSAssistant = await self._api_factory.get_ws_assistant()
await ws.connect(ws_url=CONSTANTS.WSS_URL, ping_timeout=CONSTANTS.PING_TIMEOUT)
return ws

@property
def last_recv_time(self):
if self._ws_assistant is None:
return 0
else:
return self._ws_assistant.last_recv_time

async def get_auth_token(self) -> str:
rest_assistant = await self._api_factory.get_rest_assistant()
try:
response_json = await rest_assistant.execute_request(
url=web_utils.public_rest_url(path_url=CONSTANTS.WAZIRX_USER_STREAM_PATH_URL, domain=self._domain),
method=RESTMethod.POST,
params={"recvWindow": 10000},
is_auth_required=True,
throttler_limit_id=CONSTANTS.WAZIRX_USER_STREAM_PATH_URL,
headers=self._auth.header_for_authentication()
)
except Exception:
raise
return response_json["auth_key"]

async def _subscribe_channels(self, websocket_assistant: WSAssistant):
try:
self._current_listen_key = await self.get_auth_token()
payload = {"event": "subscribe", "streams": ["orderUpdate", "ownTrade", "outboundAccountPosition"], "auth_key": self._current_listen_key}
subscribe_request = WSJSONRequest(payload)
await websocket_assistant.send(subscribe_request)

self.logger().info("Subscribed to private order changes and trades updates channels...")
except asyncio.CancelledError:
raise
except Exception:
self.logger().exception("Unexpected error occurred subscribing to user streams...")
raise

async def _process_event_message(self, event_message: Dict[str, Any], queue: asyncio.Queue):
if "streams" or "data" in event_message or event_message["stream"] in [
CONSTANTS.USER_TRADES_ENDPOINT_NAME,
CONSTANTS.USER_ORDERS_ENDPOINT_NAME,
CONSTANTS.USER_BALANCE_ENDPOINT_NAME
]:
queue.put_nowait(event_message)
else:
if event_message.get("errorMessage") is not None:
err_msg = event_message.get("errorMessage")
raise IOError({
"label": "WSS_ERROR",
"message": f"Error received via websocket - {err_msg}."
})

async def _send_ping(self, websocket_assistant: WSAssistant):
payload = {
"event": "ping",
}
ping_request: WSJSONRequest = WSJSONRequest(payload=payload)
await websocket_assistant.send(ping_request)
67 changes: 67 additions & 0 deletions hummingbot/connector/exchange/wazirx/wazirx_auth.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
import hashlib
import hmac

# import time
import json
from collections import OrderedDict
from typing import Any, Dict
from urllib.parse import urlencode

from hummingbot.connector.time_synchronizer import TimeSynchronizer
from hummingbot.core.web_assistant.auth import AuthBase
from hummingbot.core.web_assistant.connections.data_types import RESTMethod, RESTRequest, WSRequest


class WazirxAuth(AuthBase):
def __init__(self, api_key: str, secret_key: str, time_provider: TimeSynchronizer):
self.api_key = api_key
self.secret_key = secret_key
self.time_provider = time_provider

async def rest_authenticate(self, request: RESTRequest) -> RESTRequest:
"""
Adds the server time and the signature to the request, required for authenticated interactions. It also adds
the required parameter in the request header.
:param request: the request to be configured for authenticated interaction
"""
if request.method == RESTMethod.POST and request.data is not None:
request.data = self.add_auth_to_params(params=json.loads(request.data))
else:
request.params = self.add_auth_to_params(params=request.params)

headers = {}
if request.headers is not None:
headers.update(request.headers)
headers.update(self.header_for_authentication())
request.headers = headers

return request

async def ws_authenticate(self, request: WSRequest) -> WSRequest:
"""
This method is intended to configure a websocket request to be authenticated. Wazirx does not use this
functionality
"""
return request # pass-through

def add_auth_to_params(self,
params: Dict[str, Any]):
timestamp = int(self.time_provider.time() * 1e3)

request_params = OrderedDict(params or {})
request_params["timestamp"] = timestamp

signature = self._generate_signature(params=request_params)
request_params["signature"] = signature

return request_params

def header_for_authentication(self) -> Dict[str, str]:
return {"X-API-KEY": self.api_key}

def _generate_signature(self, params: Dict[str, Any]) -> str:

encoded_params_str = urlencode(params)
signature = hmac.new(bytes(self.secret_key, 'latin-1'), msg=bytes(encoded_params_str, 'latin-1'),
digestmod=hashlib.sha256).hexdigest()
return signature
83 changes: 83 additions & 0 deletions hummingbot/connector/exchange/wazirx/wazirx_constants.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
from hummingbot.core.api_throttler.data_types import RateLimit
from hummingbot.core.data_type.common import OrderType
from hummingbot.core.data_type.in_flight_order import OrderState

DEFAULT_DOMAIN = "wazirx"

HBOT_ORDER_ID_PREFIX = "x-XEKWYICX"
MAX_ORDER_ID_LEN = 32

# Base URL
REST_URL = "https://api.wazirx.com/sapi/"
WSS_URL = "wss://stream.wazirx.com/stream"

PUBLIC_API_VERSION = "v1"
PRIVATE_API_VERSION = "v1"

# Public API endpoints or WazirxClient function
TICKER_PRICE_CHANGE_PATH_URL = "/ticker/24hr"
TICKER_BOOK_PATH_URL = "/tickers/24hr"
EXCHANGE_INFO_PATH_URL = "/exchangeInfo"
PING_PATH_URL = "/ping"
SNAPSHOT_PATH_URL = "/depth"
SERVER_TIME_PATH_URL = "/time"

# Private API endpoints or WazirxClient function
ACCOUNTS_PATH_URL = "/funds"
MY_TRADES_PATH_URL = "/myTrades"
ORDER_PATH_URL = "/order"
WAZIRX_USER_STREAM_PATH_URL = "/create_auth_token"

WS_HEARTBEAT_TIME_INTERVAL = 30
PING_TIMEOUT = 10

ORDER_TYPE_MAP = {
OrderType.LIMIT: "limit",
OrderType.LIMIT_MAKER: "limit_maker"
}
# Wazirx params

SIDE_BUY = "buy"
SIDE_SELL = "sell"

# Rate Limit time intervals
ONE_MINUTE = 60
ONE_SECOND = 1
ONE_DAY = 86400

MAX_REQUEST = 5000

# Order States
ORDER_STATE = {
"idle": OrderState.PENDING_CREATE,
"wait": OrderState.PARTIALLY_FILLED,
"done": OrderState.FILLED,
"cancel": OrderState.CANCELED,
"failed": OrderState.FAILED
}

# Websocket event types
DIFF_EVENT_TYPE = "{}@depth10@100ms"
TRADE_EVENT_TYPE = "{}@trades"

USER_TRADES_ENDPOINT_NAME = "ownTrade"
USER_ORDERS_ENDPOINT_NAME = "orderUpdate"
USER_BALANCE_ENDPOINT_NAME = "outboundAccountPosition"

RATE_LIMITS = [
RateLimit(limit_id=PING_PATH_URL, limit=1, time_interval=ONE_SECOND),
RateLimit(limit_id=EXCHANGE_INFO_PATH_URL, limit=1, time_interval=ONE_SECOND),
RateLimit(limit_id=TICKER_PRICE_CHANGE_PATH_URL, limit=1, time_interval=ONE_SECOND),
RateLimit(limit_id=TICKER_BOOK_PATH_URL, limit=1, time_interval=ONE_SECOND),
RateLimit(limit_id=SNAPSHOT_PATH_URL, limit=1, time_interval=ONE_SECOND),
RateLimit(limit_id=WAZIRX_USER_STREAM_PATH_URL, limit=1, time_interval=ONE_SECOND),
RateLimit(limit_id=SERVER_TIME_PATH_URL, limit=1, time_interval=ONE_SECOND),
RateLimit(limit_id=ACCOUNTS_PATH_URL, limit=10, time_interval=ONE_SECOND),
RateLimit(limit_id=MY_TRADES_PATH_URL, limit=2, time_interval=ONE_SECOND),
RateLimit(limit_id=ORDER_PATH_URL, limit=10, time_interval=ONE_SECOND),
]

ORDER_NOT_EXIST_ERROR_CODE = -2013
ORDER_NOT_EXIST_MESSAGE = "Order does not exist"
UNKNOWN_ORDER_ERROR_CODE = -2011
UNKNOWN_ORDER_MESSAGE = "Unknown order sent"
Loading