import stripe import requests import json import time import decimal import re import sys from time import sleep from datetime import datetime, timedelta from flask import Blueprint from flask import make_response from flask import request from flask import current_app from flask import session from flask import redirect from flask import url_for from flask import jsonify from flask import flash from flask import render_template from werkzeug.exceptions import abort from capsulflask.auth import account_required from capsulflask.db import get_model from capsulflask.btcpay import client as btcpay from capsulflask.shared import my_exec_info_message, get_account_balance, average_number_of_days_in_a_month bp = Blueprint("payment", __name__, url_prefix="/payment") def validate_dollars(min: float, max: float): errors = list() dollars = None if "dollars" not in request.form: errors.append("dollars is required") else: dollars = None try: dollars = float(request.form["dollars"]) except: errors.append("dollars must be a number") #current_app.logger.info(f"{str(dollars)} {str(min)} {str(dollars < min)}") if dollars is not None and dollars < min: errors.append(f"dollars must be {format(min, '.2f')} or more") elif dollars is not None and dollars >= max: errors.append(f"dollars must be less than {format(max, '.2f')}") current_app.logger.info(f"{len(errors)} {errors}") return (dollars, errors) @bp.route("/btcpay", methods=("GET", "POST")) @account_required def btcpay_payment(): errors = list() if current_app.config['BTCPAY_URL'] == "": flash("BTCPay is not enabled on this server") return redirect(url_for("console.account_balance")) elif 'BTCPAY_CLIENT' not in current_app.config: if not try_reconnnect_btcpay(current_app): flash("can't contact the BTCPay server right now") return redirect(url_for("console.account_balance")) if request.method == "POST": dollars, errors = validate_dollars(0.01, 1000.0) #current_app.logger.info(f"{len(errors)} {errors}") if len(errors) == 0: try: invoice = current_app.config['BTCPAY_CLIENT'].create_invoice(dict( price=float(dollars), currency="USD", itemDesc="Capsul Cloud Compute", transactionSpeed="high", redirectURL=f"{current_app.config['BASE_URL']}/console/account-balance", notificationURL=f"{current_app.config['BASE_URL']}/payment/btcpay/webhook" )) except: current_app.logger.error(f"An error occurred while attempting to reach BTCPay Server: {my_exec_info_message(sys.exc_info())}") flash("An error occurred while attempting to reach BTCPay Server.") return redirect(url_for("console.account_balance")) current_app.logger.info(f"created btcpay invoice: {invoice}") # print(invoice) current_app.logger.info(f"created btcpay invoice_id={invoice['id']} ( {session['account']}, ${dollars} )") get_model().create_payment_session("btcpay", invoice["id"], session["account"], dollars) return redirect(invoice["url"]) for error in errors: flash(error) return render_template("btcpay.html") def poll_btcpay_session(invoice_id): if 'BTCPAY_CLIENT' not in current_app.config: if not try_reconnnect_btcpay(current_app): return [503, "can't contact btcpay server"] invoice = None try: invoice = current_app.config['BTCPAY_CLIENT'].get_invoice(invoice_id) except: current_app.logger.error(f""" error was thrown when contacting btcpay server: {my_exec_info_message(sys.exc_info())}""" ) return [503, "error was thrown when contacting btcpay server"] if invoice['currency'] != "USD": return [400, "invalid currency"] dollars = invoice['price'] current_app.logger.info(f"poll_btcpay_session invoice_id={invoice_id}, status={invoice['status']} dollars={dollars}") if invoice['status'] == "paid" or invoice['status'] == "confirmed" or invoice['status'] == "complete": success_account = get_model().consume_payment_session("btcpay", invoice_id, dollars) if success_account: current_app.logger.info(f"{success_account} paid ${dollars} successfully (btcpay_invoice_id={invoice_id})") if invoice['status'] == "complete": resolved_invoice_email = get_model().btcpay_invoice_resolved(invoice_id, True) if resolved_invoice_email is not None: check_if_shortterm_flag_can_be_unset(resolved_invoice_email) elif invoice['status'] == "expired" or invoice['status'] == "invalid": get_model().btcpay_invoice_resolved(invoice_id, False) get_model().delete_payment_session("btcpay", invoice_id) return [200, "ok"] def try_reconnnect_btcpay(app): try: response = requests.get(app.config['BTCPAY_URL']) if response.status_code == 200: app.config['BTCPAY_CLIENT'] = btcpay.Client(api_uri=app.config['BTCPAY_URL'], pem=app.config['BTCPAY_PRIVATE_KEY']) return True else: app.logger.warn(f"Can't reach BTCPAY_URL {app.config['BTCPAY_URL']}: Response status code: {response.status_code}. Capsul will work fine except cryptocurrency payments will not work.") return False except: app.logger.warn("unable to create btcpay client. Capsul will work fine except cryptocurrency payments will not work. The error was: " + my_exec_info_message(sys.exc_info())) return False @bp.route("/btcpay/webhook", methods=("POST",)) def btcpay_webhook(): current_app.logger.info(f"got btcpay webhook") # IMPORTANT! there is no signature or credential to authenticate the data sent into this webhook :facepalm: # its just a notification, thats all. request_data = json.loads(request.data) invoice_id = request_data['id'] # so you better make sure to get the invoice data directly from the horses mouth! result = poll_btcpay_session(invoice_id) return result[1], result[0] # if the user has any shortterm vms (vms which were created at a time when they could not pay for that vm for 1 month) # then it would be nice if those vms could be "promoted" to long-term if they pay up enough to cover costs for 1 month def check_if_shortterm_flag_can_be_unset(email: str): vms = get_model().list_vms_for_account(email) payments = get_model().list_payments_for_account(email) balance = get_account_balance(vms, payments, datetime.utcnow() + timedelta(days=average_number_of_days_in_a_month)) if balance > 0: get_model().clear_shortterm_flag(email) @bp.route("/stripe", methods=("GET", "POST")) @account_required def stripe_payment(): stripe_checkout_session_id=None errors = list() if request.method == "POST": dollars, errors = validate_dollars(0.5, 1000000) if len(errors) == 0: current_app.logger.info(f"creating stripe checkout session for {session['account']}, ${dollars}") checkout_session = stripe.checkout.Session.create( success_url=current_app.config['BASE_URL'] + "/payment/stripe/success?session_id={CHECKOUT_SESSION_ID}", cancel_url=current_app.config['BASE_URL'] + "/payment/stripe", payment_method_types=["card"], #customer_email=session["account"], line_items=[ { "name": "Capsul Cloud Compute", "images": [current_app.config['BASE_URL']+"/static/capsul-product-image.png"], "quantity": 1, "currency": "usd", "amount": int(dollars*100) } ] ) stripe_checkout_session_id = checkout_session['id'] current_app.logger.info(f"stripe_checkout_session_id={stripe_checkout_session_id} ( {session['account']}, ${dollars} )") get_model().create_payment_session("stripe", stripe_checkout_session_id, session["account"], dollars) # We can't do this because stripe requires a bunch of server-authenticated data to be sent in the hash # of the URL. I briefly looked into reverse-engineering their proprietary javascript in order to try to figure out # how it works and gave up after I discovered that it would require multiple complex interactions with stripe's # servers, and it looked like they were trying to make what I was trying to do impossible. # I never tried running the stripe proprietary javascript in a headless brower and passing the hash from the # headless browser to the client, but I suspect it might not work anyway because they probably have thier tracking # cookie info in there somewhere, and if the cookie doesn't match they may refuse to display the page. #return redirect(f"https://checkout.stripe.com/pay/{stripe_checkout_session_id}") return redirect(f"/payment/stripe/{stripe_checkout_session_id}") for error in errors: flash(error) return render_template("stripe.html") @bp.route("/stripe/") @account_required def redirect_to_stripe(stripe_checkout_session_id): if stripe_checkout_session_id and not re.match(r"^[a-zA-Z0-9_=-]+$", stripe_checkout_session_id): stripe_checkout_session_id = '___________' response = make_response(render_template( "stripe.html", stripe_checkout_session_id=stripe_checkout_session_id, stripe_public_key=current_app.config["STRIPE_PUBLISHABLE_KEY"] )) response.headers['Content-Security-Policy'] = "default-src 'self' https://js.stripe.com" return response @bp.route("/stripe//json") @account_required def stripe_checkout_session_json(stripe_checkout_session_id): if stripe_checkout_session_id and not re.match(r"^[a-zA-Z0-9_=-]+$", stripe_checkout_session_id): stripe_checkout_session_id = '___________' has_redirected_already = get_model().payment_session_redirect(session['account'], stripe_checkout_session_id) if has_redirected_already is None: abort(404, "Not Found") return jsonify(dict(hasRedirectedAlready=has_redirected_already)) def validate_stripe_checkout_session(stripe_checkout_session_id): checkout_session_completed_events = stripe.Event.list( type='checkout.session.completed', created={ # Check for events created in the last half hour 'gte': int(time.time() - (30 * 60)), }, ) for event in checkout_session_completed_events.auto_paging_iter(): checkout_session = event['data']['object'] if checkout_session and 'id' in checkout_session and checkout_session['id'] == stripe_checkout_session_id: cents = checkout_session['amount_total'] dollars = decimal.Decimal(cents)/100 #consume_payment_session deletes the checkout session row and inserts a payment row # its ok to call consume_payment_session more than once because it only takes an action if the session exists success_email = get_model().consume_payment_session("stripe", stripe_checkout_session_id, dollars) if success_email is not None: check_if_shortterm_flag_can_be_unset(success_email) if success_email: return dict(email=success_email, dollars=dollars) return None @bp.route("/stripe/success", methods=("GET",)) def success(): stripe_checkout_session_id = request.args.get('session_id') if not stripe_checkout_session_id: current_app.logger.info("/payment/stripe/success returned 400: missing required URL parameter session_id") abort(400, "missing required URL parameter session_id") else: for _ in range(0, 5): paid = validate_stripe_checkout_session(stripe_checkout_session_id) if paid: current_app.logger.info(f"{paid['email']} paid ${paid['dollars']} successfully (stripe_checkout_session_id={stripe_checkout_session_id})") return redirect(url_for("console.account_balance")) else: sleep(1) abort(400, "this checkout session is not paid yet") # webhook is not needed # @bp.route("/webhook", methods=("POST",)) # def webhook(): # request_data = json.loads(request.data) # signature = request.headers.get('stripe-signature') # try: # event = stripe.Webhook.construct_event( # payload=request_data, # sig_header=signature, # secret=current_app.config['STRIPE_WEBHOOK_SECRET'] # ) # if event['type'] == 'checkout.session.completed': # dollars = event['data']['object']['amount_total'] # stripe_checkout_session_id = event['data']['object']['id'] # #consume_payment_session deletes the checkout session row and inserts a payment row # # its ok to call consume_payment_session more than once because it only takes an action if the session exists # success_account = get_model().consume_payment_session("stripe", stripe_checkout_session_id, dollars) # if success_account: # print(f"{success_account} paid ${dollars} successfully (stripe_checkout_session_id={stripe_checkout_session_id})") # return jsonify({'status': 'success'}) # except ValueError as e: # print("/payment/stripe/webhook returned 400: bad request", e) # abort(400, "bad request") # except stripe.error.SignatureVerificationError: # print("/payment/stripe/webhook returned 400: invalid signature") # abort(400, "invalid signature")