Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 44 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
## Welcome to the python-currencycom

This is an unofficial Python wrapper for the Currency.com exchange REST API v1.
This is an unofficial Python wrapper for the Currency.com exchange REST API v1 and Websockets API.
I am in no way affiliated with Currency.com, use at your own risk.

### Documentation
Expand All @@ -24,7 +24,7 @@ Let's retrieve tradable symbols on the market
```python
from pprint import pprint

from currencycom.client import Client
from currencycom.client import CurrencycomClient as Client

client = Client('API_KEY', 'SECRET_KEY')

Expand All @@ -35,4 +35,46 @@ pprint(tradable_symbols,
indent=2)
```

### Hybrid = Websockets + REST API

Python3.6+ is required for the websockets support

```python
import time
import asyncio

from pprint import pprint

from currencycom.hybrid import CurrencycomHybridClient


def your_handler(message):
pprint(message, indent=2)


async def keep_waiting():
while True:
await asyncio.sleep(20)


client = CurrencycomHybridClient(api_key='YOUR_API_KEY', api_secret='YOUR_API_SECRET',
handler=your_handler, demo=True)

# Subscribe to market data
client.subscribe("BTC/USD_LEVERAGE", "ETH/USD_LEVERAGE")

# Run the client in a thread
client.run()
time.sleep(3)

# Also you can use REST API
pprint(client.rest.get_24h_price_change("BTC/USD_LEVERAGE"))

loop = asyncio.get_event_loop()
loop.run_until_complete(keep_waiting())
```

Default symbol price handler is provided for you, you can use it or write your own.

For more check out [the documentation](https://exchange.currency.com/api) and [Swagger](https://apitradedoc.currency.com/swagger-ui.html#/).

Empty file added currencycom/asyncio/__init__.py
Empty file.
241 changes: 241 additions & 0 deletions currencycom/asyncio/websockets.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,241 @@
import asyncio
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's avoid adding packages with the same name as default asyncio package

import json
import logging
import time
import websockets

from random import random
from datetime import datetime
from typing import Optional

from ..client import CurrencycomClient


class ReconnectingWebsocket:
MAX_RECONNECTS = 5
MAX_RECONNECT_SECONDS = 60
MIN_RECONNECT_WAIT = 0.5
TIMEOUT = 10
PING_TIMEOUT = 5

def __init__(self, loop, client, coro):
self._loop = loop
self._log = logging.getLogger(__name__)
self._coro = coro
self._reconnect_attempts = 0
self._conn = None
self._connect_id = None
self._socket = None
self._request = {
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's move this var to send_message as it shouldn't be stored across the object

"destination": 'ping',
"correlationId": 0,
"payload": {}
}
self._client: CurrencycomClient = client
self._last_ping = None

self._connect()

def _connect(self):
self._conn = asyncio.ensure_future(self._run(), loop=self._loop)

async def _run(self):
keep_waiting = True
self._last_ping = time.time()

async with websockets.connect(self._client.constants.BASE_WSS_URL) as socket:
self._socket = socket
self._reconnect_attempts = 0

try:
while keep_waiting:
if time.time() - self._last_ping > self.PING_TIMEOUT:
await self.send_ping()
try:
evt = await asyncio.wait_for(self._socket.recv(), timeout=self.PING_TIMEOUT)
except asyncio.TimeoutError:
self._log.debug("Ping timeout in {} seconds".format(self.PING_TIMEOUT))
await self.send_ping()
except asyncio.CancelledError:
self._log.debug("Websocket cancelled error")
await self._socket.ping()
else:
try:
evt_obj = json.loads(evt)
except ValueError:
pass
else:
await self._coro(evt_obj)
except websockets.ConnectionClosed:
keep_waiting = False
await self._reconnect()
except Exception as e:
self._log.debug('Websocket exception:{}'.format(e))
keep_waiting = False
await self._reconnect()

async def _reconnect(self):
await self.cancel()
self._reconnect_attempts += 1
if self._reconnect_attempts < self.MAX_RECONNECTS:
self._log.debug(f"Websocket reconnecting {self.MAX_RECONNECTS - self._reconnect_attempts} attempts left")
reconnect_wait = self._get_reconnect_wait(self._reconnect_attempts)
await asyncio.sleep(reconnect_wait)
self._connect()
else:
self._log.error(f"Websocket could not reconnect after {self._reconnect_attempts} attempts")
pass

def _get_reconnect_wait(self, attempts):
expo = 2 ** attempts
return round(random() * min(self.MAX_RECONNECT_SECONDS, expo - 1) + 1)
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do you do random there?


async def send_message(self, destination, payload, access: Optional[str] = None, retry_count=0):
if not self._socket:
if retry_count < 5:
await asyncio.sleep(1)
await self.send_message(destination, payload, access, retry_count + 1)
else:
self._request["destination"] = destination
self._request["payload"] = payload
self._request["correlationId"] += 1

if access == 'private':
self._log.error('Private access not implemented')

message = json.dumps(self._request)
await self._socket.send(message)

async def send_ping(self):
await self.send_message('ping', {}, access='public')
self._last_ping = time.time()

async def cancel(self):
try:
self._conn.cancel()
except asyncio.CancelledError:
pass


class CurrencycomSocketManager:
"""
A class to manage the websocket connection to Currencycom.

Use the following methods to subscribe to Currencycom events:
- subscribe_market_data(symbols)
- subscribe_depth_market_data(symbols)
- subscribe_OHLC_market_data(symbols)
- subscribe_trades(symbols)
"""

def __init__(self):
"""
Initialise the Currencycom Socket Manager
"""
self._callback = None
self._conn: Optional[ReconnectingWebsocket] = None
self._loop = None
self._client = None
self._log = logging.getLogger(__name__)

@classmethod
async def create(cls, loop, client, callback):
self = CurrencycomSocketManager()
self._loop = loop
self._client = client
self._callback = callback
self._conn = ReconnectingWebsocket(loop, client, self._callback)
return self

async def subscribe_market_data(self, symbols: [str]):
"""
Market data stream

This subscription produces the following events:
{
"status":"OK",
"Destination":"internal.quote",
"Payload":{
"symbolName":"TXN",
"bid":139.85,
"bidQty":2500,
"ofr":139.92000000000002,
"ofrQty":2500,
"timestamp":1597850971558
}
}
"""
await self._conn.send_message("marketData.subscribe", {"symbols": symbols}, 'public')

async def subscribe_depth_market_data(self, symbols: [str]):
"""
Depth market data stream

This subscription produces the following events:
{
"status":"OK",
"Destination":"marketdepth.event",
"Payload":{
"Data":{
"ts":1597849462575,
"Bid":{
"2":25,
"1.94":25.9
},
"Ofr":{
"3.3":1,
"2.627":6.1
}
},
"symbol":"Natural Gas"
}
}
"""
await self._conn.send_message("depthMarketData.subscribe", {"symbols": symbols})

async def subscribe_OHLC_market_data(self, intervals: [str], symbols: [str]):
"""
OHLC market data stream

This subscription produces the following events:
{
"status":"OK",
"correlationId":"2",
"payload":{
"status":"OK",
"Destination":"ohlc.event",
"Payload":{
"interval":"1m",
"symbol":"TS",
"T":1597850100000,
"H":11.89,
"L":11.88,
"O":11.89,
"C":11.89
}
}
}
"""
await self._conn.send_message("OHLCMarketData.subscribe", {"intervals": intervals, "symbols": symbols})

async def subscribe_trades(self, symbols: [str]):
"""
Trades stream

This subscription produces the following events:
{
"status":"OK",
"destination":"internal.trade",
"payload":{
"price":11400.95,
"size":0.058,
"id":1616651347,
"ts":1596625079952,
"symbol":"BTC/USD",
"orderId":"00a02503-0079-54c4-0000-00004020316a",
"clientOrderId":"00a02503-0079-54c4-0000-482f00003a06",
"buyer":true
}
}
"""
await self._conn.send_message("trades.subscribe", {"symbols": symbols})
Loading