Verifying Webhoook Authenticity
One crucial step when receiving a webhook is verifying whether the request genuinely originated from Avenia API. To ensure this, we provide a Public Key that you can use to verify the digital signature included in each webhook request.
Every webhook sent by Avenia API contains a signature header. By using our public key to validate this signature against the request body, you can confirm that:
The request was sent by Avenia API.
The contents of the request were not altered during transmission.
This verification step is essential to guarantee the security and authenticity of the communication between your system and Avenia API.
Get Avenia API's Public Key
HTTP Get Request
https://api.sandbox.avenia.io:10952/v2/public-key
cUrl Example
curl https://api.sandbox.avenia.io:10952/v2/public-key
JSON Response
Do not use the public key shown below. We strongly recommend retrieving it directly from the endpoint, as this key may be rotated or updated at any time.
{
"publicKey": "-----BEGIN RSA PUBLIC KEY-----\nMIIBCgKCAQEAtqSxz7KuZwAw5VIhtwx7Tk+Hu4LclF0DyKws4tlEwPCDPSCh9bNh\nUNJREy5qg4W7wMGk16wCViKIvqTL+NmrEErLAFfZso5+g9xZx+5kZeYqaB+i7LA7\nGpDveSmlQEeU4G81S8HgFuTx0XkGDAAH6KiNWIyMmDcfX6YIg8ciYH3XQ+ocste9\nggnql0t3ZEGCrGW41+2cKeUowAIdT/mb5Yql93uoWFvJaS1o/zFDhNvhdwv12bDC\n2y7Y4BR7rguZdtqXKCqWJSnaJZoRmTOuTIIMuxFJFmBLgxrt/NKPXyxRBjoqUY5H\nQ3YpHxfNxw9R2tqEHqUA7K4K4dzVdSaDhwIDAQAB\n-----END RSA PUBLIC KEY-----\n"
}
With the up-to-date Public Key in hand, you can verify that the webhook's origin is authentic and that its contents have not been tampered with.
Webhook Signature Verification in Practice
The following code is written in Python for readability purposes. While the syntax may vary across programming languages, the core logic for verifying the origin of a webhook remains the same. Python is used here because it offers a clear and easily transferable example.
Let's begin by importing the required libraries
import base64
import json
import logging
import requests
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.asymmetric import padding, rsa
from cryptography.hazmat.primitives.serialization import load_pem_public_key
from flask import Flask, request, Response
app = Flask(__name__)
logging.basicConfig(level=logging.INFO)
There’s no big secret here — this is everything we need to perform the verification in Python.
Getting Public Key
Remember that APIs switch from sandbox to production, and consequently, the public keys differ.
def get_public_key():
api = "https://api.sandbox.avenia.io:10952/v2/public-key"
try:
response = requests.get(api)
response.raise_for_status()
key_response = response.json()
public_key_pem = key_response.get("publicKey")
if not public_key_pem:
raise ValueError("Empty public key received")
public_key = load_pem_public_key(public_key_pem.encode())
if not isinstance(public_key, rsa.RSAPublicKey):
raise TypeError("Key is not RSA")
return public_key
except requests.RequestException as e:
raise Exception(f"Failed to fetch public key: {str(e)}")
except json.JSONDecodeError as e:
raise Exception(f"Failed to parse response: {str(e)}")
except (ValueError, TypeError) as e:
raise Exception(str(e))
Verifying Signature from Avenia API
In short, we retrieve the signature from the request header, using the SIGNATURE
field, and compare it against the public key. If the signature is indeed from Avenia API, we return True; otherwise, we return False.
def verify_signature(body, signature_base64, public_key):
try:
signature = base64.b64decode(signature_base64)
public_key.verify(
signature,
body,
padding.PSS(
mgf=padding.MGF1(hashes.SHA256()),
salt_length=padding.PSS.MAX_LENGTH
),
hashes.SHA256()
)
return True
except Exception as e:
logging.error("Verification error: %s", e)
return False
Putting It All Together
Here we combine all the previously defined functions and start listening on port :8080
to receive incoming webhook attempts. Just a reminder: in order to receive a webhook, you first need to register a webhook
.
@app.route('/', methods=['POST'])
def handler():
try:
public_key = get_public_key()
except Exception as e:
logging.error(f"Error obtaining public key: {str(e)}")
return Response("Internal Server Error", status=500)
signature_base64 = request.headers.get('Signature')
if not signature_base64:
return Response("Signature header is required", status=401)
body = request.get_data()
if verify_signature(body, signature_base64, public_key):
return Response("Valid webhook received", status=200)
else:
return Response("Invalid signature", status=401)
if __name__ == "__main__":
logging.info("Listening on port 8080...")
app.run(host='0.0.0.0', port=8080)
Expected Output
If you follow the steps and trigger an action that generates a valid event, you should see an output in the function similar to this:
INFO:werkzeug:127.0.0.1 - - [99/Apr/9999 99:99:42] "POST / HTTP/1.1" 200 -
If the signature is invalid:
INFO:werkzeug:127.0.0.1 - - [99/Apr/9999 99:99:42] "POST / HTTP/1.1" 401 -
Check the valid events here
Full Code
import base64
import json
import logging
import requests
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.asymmetric import padding, rsa
from cryptography.hazmat.primitives.serialization import load_pem_public_key
from flask import Flask, request, Response
app = Flask(__name__)
logging.basicConfig(level=logging.INFO)
def get_public_key():
api = "https://api.sandbox.avenia.io:10952/v2/public-key"
try:
response = requests.get(api)
response.raise_for_status()
key_response = response.json()
public_key_pem = key_response.get("publicKey")
if not public_key_pem:
raise ValueError("Empty public key received")
public_key = load_pem_public_key(public_key_pem.encode())
if not isinstance(public_key, rsa.RSAPublicKey):
raise TypeError("Key is not RSA")
return public_key
except requests.RequestException as e:
raise Exception(f"Failed to fetch public key: {str(e)}")
except json.JSONDecodeError as e:
raise Exception(f"Failed to parse response: {str(e)}")
except (ValueError, TypeError) as e:
raise Exception(str(e))
def verify_signature(body, signature_base64, public_key):
try:
signature = base64.b64decode(signature_base64)
public_key.verify(
signature,
body,
padding.PSS(
mgf=padding.MGF1(hashes.SHA256()),
salt_length=padding.PSS.MAX_LENGTH
),
hashes.SHA256()
)
return True
except Exception as e:
logging.error("Verification error: %s", e)
return False
@app.route('/', methods=['POST'])
def handler():
try:
public_key = get_public_key()
except Exception as e:
logging.error(f"Error obtaining public key: {str(e)}")
return Response("Internal Server Error", status=500)
signature_base64 = request.headers.get('Signature')
if not signature_base64:
return Response("Signature header is required", status=401)
body = request.get_data()
if verify_signature(body, signature_base64, public_key):
return Response("Valid webhook received", status=200)
else:
return Response("Invalid signature", status=401)
if __name__ == "__main__":
logging.info("Listening on port 8080...")
app.run(host='0.0.0.0', port=8080)