import stripe import json import time import decimal import re import sys from time import sleep from flask import Blueprint from flask import make_response from flask import request from flask import current_app from flask import session from flask import redirect from flask import url_for from flask import jsonify from flask import flash from flask import render_template from werkzeug.exceptions import abort from capsulflask.auth import account_required from capsulflask.db import get_model from capsulflask.shared import my_exec_info_message bp = Blueprint("payment", __name__, url_prefix="/payment") def validate_dollars(): errors = list() dollars = None if "dollars" not in request.form: errors.append("dollars is required") else: dollars = None try: dollars = decimal.Decimal(request.form["dollars"]) except: errors.append("dollars must be a number") # TODO re enable this # if dollars and dollars < decimal.Decimal(1): # errors.append("dollars must be >= 1") return [errors, dollars] @bp.route("/btcpay", methods=("GET", "POST")) @account_required def btcpay_payment(): errors = list() if not current_app.config['BTCPAY_ENABLED']: flash("BTCPay is not enabled on this server") return redirect(url_for("console.account_balance")) if request.method == "POST": result = validate_dollars() errors = result[0] dollars = result[1] if len(errors) == 0: invoice = current_app.config['BTCPAY_CLIENT'].create_invoice(dict( price=float(dollars), currency="USD", itemDesc="Capsul Cloud Compute", transactionSpeed="high", redirectURL=f"{current_app.config['BASE_URL']}/console/account-balance", notificationURL=f"{current_app.config['BASE_URL']}/payment/btcpay/webhook" )) current_app.logger.info(f"created btcpay invoice: {invoice}") # print(invoice) current_app.logger.info(f"created btcpay invoice_id={invoice['id']} ( {session['account']}, ${dollars} )") get_model().create_payment_session("btcpay", invoice["id"], session["account"], dollars) return redirect(invoice["url"]) for error in errors: flash(error) return render_template("btcpay.html") def poll_btcpay_session(invoice_id): invoice = None try: invoice = current_app.config['BTCPAY_CLIENT'].get_invoice(invoice_id) except: current_app.logger.error(f""" error was thrown when contacting btcpay server: {my_exec_info_message(sys.exc_info())}""" ) return [503, "error was thrown when contacting btcpay server"] if invoice['currency'] != "USD": return [400, "invalid currency"] dollars = invoice['price'] current_app.logger.info(f"poll_btcpay_session invoice_id={invoice_id}, status={invoice['status']} dollars={dollars}") if invoice['status'] == "paid" or invoice['status'] == "confirmed" or invoice['status'] == "complete": success_account = get_model().consume_payment_session("btcpay", invoice_id, dollars) if success_account: current_app.logger.info(f"{success_account} paid ${dollars} successfully (btcpay_invoice_id={invoice_id})") if invoice['status'] == "complete": get_model().btcpay_invoice_resolved(invoice_id, True) elif invoice['status'] == "expired" or invoice['status'] == "invalid": get_model().btcpay_invoice_resolved(invoice_id, False) get_model().delete_payment_session("btcpay", invoice_id) return [200, "ok"] @bp.route("/btcpay/webhook", methods=("POST",)) def btcpay_webhook(): current_app.logger.info(f"got btcpay webhook") # IMPORTANT! there is no signature or credential to authenticate the data sent into this webhook :facepalm: # its just a notification, thats all. request_data = json.loads(request.data) invoice_id = request_data['id'] # so you better make sure to get the invoice data directly from the horses mouth! result = poll_btcpay_session(invoice_id) return result[1], result[0] @bp.route("/stripe", methods=("GET", "POST")) @account_required def stripe_payment(): stripe_checkout_session_id=None errors = list() if request.method == "POST": result = validate_dollars() errors = result[0] dollars = result[1] if len(errors) == 0: current_app.logger.info(f"creating stripe checkout session for {session['account']}, ${dollars}") checkout_session = stripe.checkout.Session.create( success_url=current_app.config['BASE_URL'] + "/payment/stripe/success?session_id={CHECKOUT_SESSION_ID}", cancel_url=current_app.config['BASE_URL'] + "/payment/stripe", payment_method_types=["card"], #customer_email=session["account"], line_items=[ { "name": "Capsul Cloud Compute", "images": [current_app.config['BASE_URL']+"/static/capsul-product-image.png"], "quantity": 1, "currency": "usd", "amount": int(dollars*100) } ] ) stripe_checkout_session_id = checkout_session['id'] current_app.logger.info(f"stripe_checkout_session_id={stripe_checkout_session_id} ( {session['account']}, ${dollars} )") get_model().create_payment_session("stripe", stripe_checkout_session_id, session["account"], dollars) # We can't do this because stripe requires a bunch of server-authenticated data to be sent in the hash # of the URL. I briefly looked into reverse-engineering their proprietary javascript in order to try to figure out # how it works and gave up after I discovered that it would require multiple complex interactions with stripe's # servers, and it looked like they were trying to make what I was trying to do impossible. # I never tried running the stripe proprietary javascript in a headless brower and passing the hash from the # headless browser to the client, but I suspect it might not work anyway because they probably have thier tracking # cookie info in there somewhere, and if the cookie doesn't match they may refuse to display the page. #return redirect(f"https://checkout.stripe.com/pay/{stripe_checkout_session_id}") return redirect(f"/payment/stripe/{stripe_checkout_session_id}") for error in errors: flash(error) return render_template("stripe.html") @bp.route("/stripe/") @account_required def redirect_to_stripe(stripe_checkout_session_id): if stripe_checkout_session_id and not re.match(r"^[a-zA-Z0-9_=-]+$", stripe_checkout_session_id): stripe_checkout_session_id = '___________' response = make_response(render_template( "stripe.html", stripe_checkout_session_id=stripe_checkout_session_id, stripe_public_key=current_app.config["STRIPE_PUBLISHABLE_KEY"] )) response.headers['Content-Security-Policy'] = "default-src 'self' https://js.stripe.com" return response @bp.route("/stripe//json") @account_required def stripe_checkout_session_json(stripe_checkout_session_id): if stripe_checkout_session_id and not re.match(r"^[a-zA-Z0-9_=-]+$", stripe_checkout_session_id): stripe_checkout_session_id = '___________' has_redirected_already = get_model().payment_session_redirect(session['account'], stripe_checkout_session_id) if has_redirected_already is None: abort(404, "Not Found") return jsonify(dict(hasRedirectedAlready=has_redirected_already)) def validate_stripe_checkout_session(stripe_checkout_session_id): checkout_session_completed_events = stripe.Event.list( type='checkout.session.completed', created={ # Check for events created in the last half hour 'gte': int(time.time() - (30 * 60)), }, ) for event in checkout_session_completed_events.auto_paging_iter(): checkout_session = event['data']['object'] if checkout_session and 'id' in checkout_session and checkout_session['id'] == stripe_checkout_session_id: cents = checkout_session['amount_total'] dollars = decimal.Decimal(cents)/100 #consume_payment_session deletes the checkout session row and inserts a payment row # its ok to call consume_payment_session more than once because it only takes an action if the session exists success_email = get_model().consume_payment_session("stripe", stripe_checkout_session_id, dollars) if success_email: return dict(email=success_email, dollars=dollars) return None @bp.route("/stripe/success", methods=("GET",)) def success(): stripe_checkout_session_id = request.args.get('session_id') if not stripe_checkout_session_id: current_app.logger.info("/payment/stripe/success returned 400: missing required URL parameter session_id") abort(400, "missing required URL parameter session_id") else: for _ in range(0, 5): paid = validate_stripe_checkout_session(stripe_checkout_session_id) if paid: current_app.logger.info(f"{paid['email']} paid ${paid['dollars']} successfully (stripe_checkout_session_id={stripe_checkout_session_id})") return redirect(url_for("console.account_balance")) else: sleep(1) abort(400, "this checkout session is not paid yet") # webhook is not needed # @bp.route("/webhook", methods=("POST",)) # def webhook(): # request_data = json.loads(request.data) # signature = request.headers.get('stripe-signature') # try: # event = stripe.Webhook.construct_event( # payload=request_data, # sig_header=signature, # secret=current_app.config['STRIPE_WEBHOOK_SECRET'] # ) # if event['type'] == 'checkout.session.completed': # dollars = event['data']['object']['amount_total'] # stripe_checkout_session_id = event['data']['object']['id'] # #consume_payment_session deletes the checkout session row and inserts a payment row # # its ok to call consume_payment_session more than once because it only takes an action if the session exists # success_account = get_model().consume_payment_session("stripe", stripe_checkout_session_id, dollars) # if success_account: # print(f"{success_account} paid ${dollars} successfully (stripe_checkout_session_id={stripe_checkout_session_id})") # return jsonify({'status': 'success'}) # except ValueError as e: # print("/payment/stripe/webhook returned 400: bad request", e) # abort(400, "bad request") # except stripe.error.SignatureVerificationError: # print("/payment/stripe/webhook returned 400: invalid signature") # abort(400, "invalid signature")