capsul-flask/capsulflask/btcpay/client.py

82 lines
3.0 KiB
Python

from capsulflask.btcpay.exceptions import *
from capsulflask.btcpay import key_utils
import requests
import json
import re
class Client:
def __init__(self, api_uri="https://bitpay.com", insecure=False, pem=key_utils.generate_pem(), tokens={}):
self.uri = api_uri
self.verify = not(insecure)
self.pem = pem
self.client_id = key_utils.get_sin_from_pem(pem)
self.tokens = tokens
self.user_agent = 'bitpay-python'
def create_invoice(self, params):
self.verify_invoice_params(params['price'], params['currency'])
payload = json.dumps(params)
uri = self.uri + "/invoices"
xidentity = key_utils.get_compressed_public_key_from_pem(self.pem)
xsignature = key_utils.sign(uri + payload, self.pem)
headers = {"content-type": "application/json", 'X-Identity': xidentity, 'X-Signature': xsignature, 'X-accept-version': '2.0.0'}
try:
response = requests.post(uri, data=payload, headers=headers, verify=self.verify)
except Exception as pro:
raise BtcPayConnectionError(pro.args)
if response.ok:
return response.json()['data']
self.response_error(response)
def get_invoice(self, invoice_id):
uri = self.uri + "/invoices/" + invoice_id
xidentity = key_utils.get_compressed_public_key_from_pem(self.pem)
xsignature = key_utils.sign(uri, self.pem)
headers = {"content-type": "application/json", 'X-Identity': xidentity, 'X-Signature': xsignature, 'X-accept-version': '2.0.0'}
try:
response = requests.get(uri, headers=headers, verify=self.verify)
except Exception as pro:
raise BtcPayConnectionError(pro.args)
if response.ok:
return response.json()['data']
self.response_error(response)
def verify_invoice_params(self, price, currency):
if re.match("^[A-Z]{3,3}$", currency) is None:
raise BtcPayArgumentError("Currency is invalid.")
try:
float(price)
except:
raise BtcPayArgumentError("Price must be formatted as a float")
def response_error(self, response):
error_message = response.text
try:
error_json = json.loads(error_message)
error_message = error_json['error']
except:
pass
raise BtcPayBtcPayError('%(code)d: %(message)s' % {'code': response.status_code, 'message': error_message})
def unsigned_request(self, path, payload=None):
"""
generic btcpay usigned wrapper
passing a payload will do a POST, otherwise a GET
"""
headers = {"content-type": "application/json", "X-accept-version": "2.0.0"}
try:
if payload:
response = requests.post(self.uri + path, verify=self.verify, data=json.dumps(payload), headers=headers)
else:
response = requests.get(self.uri + path, verify=self.verify, headers=headers)
except Exception as pro:
raise BtcPayConnectionError('Connection refused')
return response
def unsigned_get_request(self, path, payload=None):
"""
Deprecated, will be removed in 2.4
"""
return self.unsigned_request('/tokens', payload)