#!/usr/bin/env python3
# -*- coding: utf-8 -*-
#
# File: unicorn_binance_websocket_api/connection.py
#
# Part of ‘UNICORN Binance WebSocket API’
# Project website: https://www.lucit.tech/unicorn-binance-websocket-api.html
# Github: https://github.com/LUCIT-Systems-and-Development/unicorn-binance-websocket-api
# Documentation: https://unicorn-binance-websocket-api.docs.lucit.tech
# PyPI: https://pypi.org/project/unicorn-binance-websocket-api
# LUCIT Online Shop: https://shop.lucit.services/software
#
# License: LSOSL - LUCIT Synergetic Open Source License
# https://github.com/LUCIT-Systems-and-Development/unicorn-binance-websocket-api/blob/master/LICENSE
#
# Author: LUCIT Systems and Development
#
# Copyright (c) 2019-2023, LUCIT Systems and Development (https://www.lucit.tech)
# All rights reserved.
from unicorn_binance_websocket_api.exceptions import StreamRecoveryError, Socks5ProxyConnectionError
from urllib.parse import urlparse
import asyncio
import copy
import logging
import socket
import socks # PySocks https://pypi.org/project/PySocks/
import ssl
import sys
import time
import websockets
connect = websockets.connect
logger = logging.getLogger("unicorn_binance_websocket_api")
[docs]
class BinanceWebSocketApiConnection(object):
def __init__(self,
manager,
stream_id,
socket_id,
channels,
markets,
symbols):
self.manager = manager
self.stream_id = copy.deepcopy(stream_id)
self.socket_id = copy.deepcopy(socket_id)
self.api_key = copy.deepcopy(self.manager.stream_list[self.stream_id]['api_key'])
self.api_secret = copy.deepcopy(self.manager.stream_list[self.stream_id]['api_secret'])
self.ping_interval = copy.deepcopy(self.manager.stream_list[self.stream_id]['ping_interval'])
self.ping_timeout = copy.deepcopy(self.manager.stream_list[self.stream_id]['ping_timeout'])
self.close_timeout = copy.deepcopy(self.manager.stream_list[self.stream_id]['close_timeout'])
self.channels = copy.deepcopy(channels)
self.markets = copy.deepcopy(markets)
self.symbols = copy.deepcopy(symbols)
self.api = copy.deepcopy(self.manager.stream_list[self.stream_id]['api'])
self.add_timeout = True if "!userData" in str(str(channels) + str(markets)) or self.api is True else False
if self.add_timeout:
logger.debug(f"BinanceWebSocketApiConnection.receive({str(self.stream_id)}) socket_id="
f"{str(self.socket_id)}) - Adding timeout to `websocket.recv()` ")
async def __aenter__(self):
if self.manager.is_stop_request(self.stream_id):
self.manager.stream_is_stopping(self.stream_id)
sys.exit(0)
uri = self.manager.create_websocket_uri(self.channels,
self.markets,
self.stream_id,
symbols=self.symbols,
api=self.manager.stream_list[self.stream_id]['api'])
if uri is False:
# cant get a valid URI, so this stream has to crash
error_msg = "Probably no internet connection?"
logger.critical("BinanceWebSocketApiConnection.await._conn.__aenter__(" + str(self.stream_id) + ", " +
str(self.channels) + ", " + str(self.markets) + ") - " + " error: 5 - " + str(error_msg))
self.manager.stream_is_crashing(self.stream_id, str(error_msg))
self.manager.set_restart_request(self.stream_id)
sys.exit(1)
else:
self.manager.stream_list[self.stream_id]['websocket_uri'] = uri
try:
if isinstance(uri, dict):
# dict = error, string = valid url
if uri['code'] == -1102 or \
uri['code'] == -2008 or \
uri['code'] == -2014 or \
uri['code'] == -2015 or \
uri['code'] == -11001:
# -1102 = Mandatory parameter 'symbol' was not sent, was empty/null, or malformed.
# -2008 = Invalid Api-Key ID
# -2014 = API-key format invalid
# -2015 = Invalid API-key, IP, or permissions for action
# -11001 = Isolated margin account does not exist.
# Can not get a valid listen_key, so this stream has to crash:
logger.critical("BinanceWebSocketApiConnection.await._conn.__aenter__(" + str(self.stream_id) +
", " + str(self.channels) + ", " + str(self.markets) + ") - error: 4 - " +
str(uri['msg']))
try:
del self.manager.restart_requests[self.stream_id]
except KeyError as error_msg:
logger.critical("BinanceWebSocketApiConnection.await._conn.__aenter__(" + str(self.stream_id) +
", " + str(self.channels) + ", " + str(self.markets) + ") - error: 6 - "
+ str(error_msg))
except TypeError as error_msg:
logger.critical("BinanceWebSocketApiConnection.await._conn.__aenter__(" + str(self.stream_id) +
", " + str(self.channels) + ", " + str(self.markets) + ") - error: 3 - "
+ str(error_msg))
else:
logger.critical("BinanceWebSocketApiConnection.await._conn.__aenter__(" + str(self.stream_id) +
", " + str(self.channels) + ", " + str(self.markets) + ") - Received unknown"
" error msg from Binance: " + str(uri['msg']))
self.manager.stream_is_crashing(self.stream_id, str(uri['msg']))
if self.manager.throw_exception_if_unrepairable:
self.manager.socket_is_ready[self.stream_id] = True
raise StreamRecoveryError("stream_id " + str(self.stream_id) + ": " + str(uri))
sys.exit(1)
except KeyError as error_msg:
logger.critical("BinanceWebSocketApiConnection.await._conn.__aenter__(" + str(self.stream_id) +
", " + str(self.channels) + ", " + str(self.markets) + ") - error: 1 - "
+ str(error_msg))
if self.manager.socks5_proxy_address is None or self.manager.socks5_proxy_port is None:
self._conn = connect(uri,
ping_interval=self.ping_interval,
ping_timeout=self.ping_timeout,
close_timeout=self.close_timeout,
extra_headers={'User-Agent': str(self.manager.get_user_agent())})
logger.info(f"BinanceWebSocketApiConnection.await._conn.__aenter__(\"{self.stream_id}, {self.channels}"
f", {self.markets}\") - No proxy used!")
else:
websocket_socks5_proxy = socks.socksocket()
websocket_socks5_proxy.set_proxy(proxy_type=socks.SOCKS5,
addr=self.manager.socks5_proxy_address,
port=int(self.manager.socks5_proxy_port),
username=self.manager.socks5_proxy_user,
password=self.manager.socks5_proxy_pass)
netloc = urlparse(self.manager.websocket_base_uri).netloc
try:
host, port = netloc.split(":")
except ValueError as error_msg:
logger.debug(f"'netloc' split error: {netloc} - {error_msg}")
host = netloc
port = 443
try:
logger.info(f"Connect to socks5 proxy {host}:{port} (ssl_verification: "
f"{self.manager.socks5_proxy_ssl_verification})")
websocket_socks5_proxy.connect((host, int(port)))
websocket_server_hostname = netloc
except socks.ProxyConnectionError as error_msg:
error_msg = f"{error_msg} ({host}:{port})"
logger.critical(error_msg)
raise Socks5ProxyConnectionError(error_msg)
except socks.GeneralProxyError as error_msg:
error_msg = f"{error_msg} ({host}:{port})"
logger.critical(error_msg)
raise Socks5ProxyConnectionError(error_msg)
self._conn = connect(uri,
ssl=self.manager.websocket_ssl_context,
sock=websocket_socks5_proxy,
server_hostname=websocket_server_hostname,
ping_interval=self.ping_interval,
ping_timeout=self.ping_timeout,
close_timeout=self.close_timeout,
extra_headers={'User-Agent': str(self.manager.get_user_agent())})
logger.info(f"BinanceWebSocketApiConnection.await._conn.__aenter__(\"{self.stream_id}, {self.channels}"
f", {self.markets}\") - Using proxy: {self.manager.socks5_proxy_address} "
f"{self.manager.socks5_proxy_port} SSL: {self.manager.socks5_proxy_ssl_verification}")
try:
try:
self.manager.websocket_list[self.stream_id] = await self._conn.__aenter__()
except websockets.exceptions.InvalidMessage as error_msg:
logger.error("BinanceWebSocketApiConnection.await._conn.__aenter__(" + str(self.stream_id) +
", " + str(self.channels) + ", " + str(self.markets) + ") - InvalidMessage error_msg: " +
str(error_msg))
self.manager.stream_is_crashing(self.stream_id, str(error_msg))
time.sleep(2)
self.manager.set_restart_request(self.stream_id)
sys.exit(1)
except websockets.exceptions.InvalidStatusCode as error_msg:
if "HTTP 429" in str(error_msg):
logger.error("BinanceWebSocketApiConnection.await._conn.__aenter__(" + str(self.stream_id) +
", " + str(self.channels) + ", " + str(self.markets) + ") InvalidStatusCode-HTTP429" +
str(error_msg))
self.manager.stream_is_crashing(self.stream_id, str(error_msg))
time.sleep(2)
self.manager.set_restart_request(self.stream_id)
sys.exit(1)
else:
logger.error("BinanceWebSocketApiConnection.await._conn.__aenter__(" + str(self.stream_id) +
", " + str(self.channels) + ", " + str(self.markets) + ") - InvalidStatusCode" +
" error_msg: " + str(error_msg))
self.manager.stream_list[self.stream_id]['status'] = "running"
self.manager.stream_list[self.stream_id]['has_stopped'] = False
try:
if self.manager.restart_requests[self.stream_id]['status'] == "restarted":
self.manager.increase_reconnect_counter(self.stream_id)
del self.manager.restart_requests[self.stream_id]
except KeyError:
pass
self.manager.set_heartbeat(self.stream_id)
self.manager.process_stream_signals("CONNECT", self.stream_id)
self.manager.stream_list[self.stream_id]['last_stream_signal'] = "CONNECT"
except websockets.exceptions.NegotiationError as error_msg:
logger.error("BinanceWebSocketApiConnection.await._conn.__aenter__(" + str(self.stream_id) + ", " +
str(self.channels) + ", " + str(self.markets) + ")" + " - NegotiationError - " +
"error_msg: " + str(error_msg))
except ConnectionResetError as error_msg:
logger.error("BinanceWebSocketApiConnection.await._conn.__aenter__(" + str(self.stream_id) + ", " +
str(self.channels) + ", " + str(self.markets) + ")" + " - ConnectionResetError - " +
"error_msg: " + str(error_msg))
except socket.gaierror as error_msg:
logger.critical("BinanceWebSocketApiConnection.await._conn.__aenter__(" + str(self.stream_id) + ", " +
str(self.channels) + ", " + str(self.markets) + ")" + " - No internet connection? "
"- error_msg: " + str(error_msg) + ": " + self.manager.websocket_base_uri)
self.manager.stream_is_crashing(self.stream_id, " - No internet connection? "
"- error_msg: " + str(error_msg) + ": " + self.manager.websocket_base_uri)
self.manager.set_restart_request(self.stream_id)
sys.exit(1)
except OSError as error_msg:
logger.critical("BinanceWebSocketApiConnection.await._conn.__aenter__(" + str(self.stream_id) + ", " +
str(self.channels) + ", " + str(self.markets) + ")" + " - OSError - error_msg: " +
str(error_msg))
self.manager.stream_is_crashing(self.stream_id, (str(error_msg)))
self.manager.set_restart_request(self.stream_id)
sys.exit(1)
except websockets.exceptions.InvalidStatusCode as error_msg:
if "Status code not 101: 414" in str(error_msg):
# Since we subscribe via websocket.send() and not with URI anymore, this is obsolete code I guess.
self.manager.stream_is_crashing(self.stream_id, str(error_msg) + " --> URI too long?")
logger.critical("BinanceWebSocketApiConnection.await._conn.__aenter__(" + str(self.stream_id) + ", " +
str(self.channels) + ", " + str(self.markets) + ")" + " - URI Too Long? - error_msg: "
+ str(error_msg))
try:
self.manager.websocket_list[self.stream_id].close()
except KeyError:
pass
sys.exit(1)
elif "Status code not 101: 400" in str(error_msg):
logger.critical("BinanceWebSocketApiConnection.await._conn.__aenter__(" + str(self.stream_id) + ", " +
str(self.channels) + ", " + str(self.markets) + ") - error_msg: " + str(error_msg))
elif "Status code not 101: 429" in str(error_msg):
logger.error("BinanceWebSocketApiConnection.await._conn.__aenter__(" + str(self.stream_id) + ", " +
str(self.channels) + ", " + str(self.markets) + ") - error_msg: " + str(error_msg))
self.manager.stream_is_crashing(self.stream_id, str(error_msg))
time.sleep(2)
self.manager.set_restart_request(self.stream_id)
sys.exit(1)
elif "Status code not 101: 500" in str(error_msg):
logger.critical("BinanceWebSocketApiConnection.await._conn.__aenter__(" + str(self.stream_id) + ", " +
str(self.channels) + ", " + str(self.markets) + ") - error_msg: " + str(error_msg))
self.manager.stream_is_crashing(self.stream_id, str(error_msg))
sys.exit(1)
else:
logger.error("BinanceWebSocketApiConnection.await._conn.__aenter__(" + str(self.stream_id) + ", " +
str(self.channels) + ", " + str(self.markets) + ") - error_msg: " + str(error_msg))
try:
self.manager.websocket_list[self.stream_id].close()
except KeyError:
pass
self.manager.stream_is_crashing(self.stream_id, str(error_msg))
self.manager.set_restart_request(self.stream_id)
sys.exit(1)
except websockets.exceptions.ConnectionClosed as error_msg:
logger.error("BinanceWebSocketApiConnection.await._conn.__aenter__(" + str(self.stream_id) + ", " +
str(self.channels) + ", " + str(self.markets) + ") - Exception ConnectionClosed"
" - error_msg: " + str(error_msg))
if "WebSocket connection is closed: code = 1006" in str(error_msg):
self.manager.websocket_list[self.stream_id].close()
self.manager.stream_is_crashing(self.stream_id, str(error_msg))
sys.exit(1)
else:
logger.critical(f"BinanceWebSocketApiConnection.await._conn.__aenter__({self.stream_id}, "
f"{self.channels}, {self.markets}) - UnhandledException ConnectionClosed - {error_msg}")
return self
async def __aexit__(self, *args, **kwargs):
try:
await self._conn.__aexit__(*args, **kwargs)
except RuntimeError as error_msg:
logger.debug(f"BinanceWebSocketApiConnection.__aexit__({self.stream_id}) - RuntimeError - {error_msg}")
self.manager.stream_is_stopping(self.stream_id)
if self.manager.is_stop_request(self.stream_id) is False and \
self.manager.is_stop_as_crash_request is False:
self.manager.set_restart_request(self.stream_id)
except AttributeError as error_msg:
logger.error(f"BinanceWebSocketApiConnection.__aexit__({self.stream_id}) - AttributeError - {error_msg}")
except websockets.exceptions.ConnectionClosed as error_msg:
logger.error(f"BinanceWebSocketApiConnection.__aexit__({self.stream_id}) - ConnectionClosed - {error_msg}")
self.manager.stream_is_stopping(self.stream_id)
if self.manager.is_stop_request(self.stream_id) is False and \
self.manager.is_stop_as_crash_request is False:
self.manager.set_restart_request(self.stream_id)
sys.exit(1)
[docs]
async def close(self):
if self.manager.is_stop_as_crash_request(self.stream_id) is False:
self.manager.stream_is_stopping(self.stream_id)
logger.info(f"BinanceWebSocketApiConnection.close({str(self.stream_id)})")
try:
await self.manager.websocket_list[self.stream_id].close()
except KeyError:
logger.error(f"BinanceWebSocketApiConnection.close({str(self.stream_id)}) - Stream not found!")
except RuntimeError as error_msg:
logger.error(f"BinanceWebSocketApiConnection.close({str(self.stream_id)}) - RuntimeError: {str(error_msg)}")
except ValueError as error_msg:
# ValueError: The future belongs to a different loop than the one specified as the loop argument
logger.error(f"BinanceWebSocketApiConnection.close({str(self.stream_id)}) socket_id="
f"{str(self.socket_id)}) - Closing this socket! - ValueError: {str(error_msg)}")
self.manager.stream_is_stopping(self.stream_id)
if self.manager.is_stop_request(self.stream_id) is False:
self.manager.set_restart_request(self.stream_id)
sys.exit(1)
[docs]
async def receive(self):
self.manager.set_heartbeat(self.stream_id)
if self.manager.is_stop_request(self.stream_id):
return False
try:
if self.add_timeout:
if self.api is True:
timeout = 0.1
else:
timeout = 1
received_data_json = await asyncio.wait_for(self.manager.websocket_list[self.stream_id].recv(),
timeout=timeout)
else:
received_data_json = await self.manager.websocket_list[self.stream_id].recv()
if self.manager.is_stop_request(self.stream_id):
return False
try:
if self.manager.restart_requests[self.stream_id]['status'] == "restarted":
self.manager.increase_reconnect_counter(self.stream_id)
del self.manager.restart_requests[self.stream_id]
except KeyError:
pass
if received_data_json is not None:
size = sys.getsizeof(str(received_data_json))
self.manager.increase_processed_receives_statistic(self.stream_id)
self.manager.add_total_received_bytes(size)
self.manager.increase_received_bytes_per_second(self.stream_id, size)
return received_data_json
except RuntimeError as error_msg:
logger.error("BinanceWebSocketApiConnection.receive(" +
str(self.stream_id) + ") - RuntimeError - error_msg: " + str(error_msg))
self.manager.stream_is_stopping(self.stream_id)
if self.manager.is_stop_request(self.stream_id) is False:
self.manager.set_restart_request(self.stream_id)
sys.exit(1)
except ssl.SSLError as error_msg:
logger.error("BinanceWebSocketApiConnection.receive(" +
str(self.stream_id) + ") - ssl.SSLError - error_msg: " + str(error_msg))
self.manager.stream_is_stopping(self.stream_id)
if self.manager.is_stop_request(self.stream_id) is False:
self.manager.set_restart_request(self.stream_id)
sys.exit(1)
except KeyError as error_msg:
logger.error("BinanceWebSocketApiConnection.receive(" +
str(self.stream_id) + ") - KeyError - error_msg: " + str(error_msg))
self.manager.stream_is_stopping(self.stream_id)
if self.manager.is_stop_request(self.stream_id) is False:
self.manager.set_restart_request(self.stream_id)
sys.exit(1)
except ValueError as error_msg:
# ValueError: The future belongs to a different loop than the one specified as the loop argument
logger.error(f"BinanceWebSocketApiConnection.receive({str(self.stream_id)}) socket_id="
f"{str(self.socket_id)}) - Closing this socket! - ValueError: {str(error_msg)}")
self.manager.stream_is_stopping(self.stream_id)
if self.manager.is_stop_request(self.stream_id) is False:
self.manager.set_restart_request(self.stream_id)
sys.exit(1)
[docs]
async def send(self, data):
self.manager.set_heartbeat(self.stream_id)
try:
await self.manager.websocket_list[self.stream_id].send(data)
self.manager.increase_transmitted_counter(self.stream_id)
except websockets.exceptions.ConnectionClosed as error_msg:
logger.error("BinanceWebSocketApiConnection.send(" + str(self.stream_id) + ", " +
str(self.channels) + ", " + str(self.markets) + ") - Exception ConnectionClosed "
"- error_msg: " + str(error_msg))
self.manager.stream_is_crashing(self.stream_id, str(error_msg))
self.manager.set_restart_request(self.stream_id)
sys.exit(1)
except RuntimeError as error_msg:
logger.error("BinanceWebSocketApiConnection.send(" + str(self.stream_id) + ", " +
str(self.channels) + ", " + str(self.markets) + ") - Exception RuntimeError "
"- error_msg: " + str(error_msg))
self.manager.stream_is_crashing(self.stream_id, str(error_msg))
self.manager.set_restart_request(self.stream_id)
sys.exit(1)
except IndexError as error_msg:
logger.error("BinanceWebSocketApiConnection.send(" + str(self.stream_id) + ", " +
str(self.channels) + ", " + str(self.markets) + ") - Exception IndexError "
"- error_msg: " + str(error_msg))
except KeyError as error_msg:
logger.error("BinanceWebSocketApiConnection.send(" + str(self.stream_id) + ", " +
str(self.channels) + ", " + str(self.markets) + ") - Exception KeyError "
"- error_msg: " + str(error_msg))
except ValueError as error_msg:
# ValueError: The future belongs to a different loop than the one specified as the loop argument
logger.error(f"BinanceWebSocketApiConnection.send({str(self.stream_id)}) socket_id="
f"{str(self.socket_id)}) - Closing this socket! - ValueError: {str(error_msg)}")
self.manager.stream_is_stopping(self.stream_id)
if self.manager.is_stop_request(self.stream_id) is False:
self.manager.set_restart_request(self.stream_id)
sys.exit(1)