Skip to main content

When to use webhooks

Webhooks are better than polling when:
  • You have many concurrent submissions
  • You want real-time notifications
  • Your API server resources are limited
  • You prefer an event-driven architecture
Polling is fine for:
  • Occasional submissions
  • Simple scripts and CLIs
  • Quick prototypes and testing

Setting up webhooks

Step 1: Prepare your endpoint

Create an HTTPS endpoint that can receive POST requests. The delivered envelope uses Standard Webhooks, so verify the signature with the standardwebhooks library and your subscription secret. The verified event exposes type (the event type) and data (a snake_case object).
from fastapi import FastAPI, Request
from standardwebhooks.webhooks import Webhook

app = FastAPI()
wh = Webhook("whsec_AbCdEf...")  # Your subscription secret

@app.post("/webhooks/integrations")
async def handle_webhook(request: Request):
    try:
        # Verify the webhook signature
        payload = await request.body()
        event = wh.verify(payload, dict(request.headers))
    except Exception as e:
        print(f"Invalid signature: {e}")
        return {"error": "Unauthorized"}, 401

    # Process the event
    print(f"Type: {event['type']}")
    print(f"Data: {event['data']}")

    # Respond immediately
    return {"status": "received"}

Step 2: Create a subscription

Use POST /api/integrations/webhooks/subscriptions. The request fields are endpointUrl (HTTPS required), eventTypes, and an optional description.
curl -X POST \
  -H "X-API-Key: your-api-key-here" \
  -H "Content-Type: application/json" \
  -d '{
    "endpointUrl": "https://your-domain.com/webhooks/integrations",
    "eventTypes": ["asset.processing.completed", "asset.processing.failed"],
    "description": "Production webhook endpoint"
  }' \
  https://mm-midmarket-integrations-api-preview.azurewebsites.net/api/integrations/webhooks/subscriptions
Response (201):
{
  "id": "550e8400-...",
  "workspaceId": "660e8400-...",
  "endpointUrl": "https://your-domain.com/webhooks/integrations",
  "eventTypes": ["asset.processing.completed", "asset.processing.failed"],
  "description": "Production webhook endpoint",
  "enabled": true,
  "secret": "whsec_AbCdEf...",
  "createdAt": "2026-06-03T12:00:00Z",
  "updatedAt": "2026-06-03T12:00:00Z"
}
The secret is returned only on create. Save it immediately — it is never shown again. The field is secret (not signing_secret), the enabled flag is enabled (not active), and the event list is eventTypes (not events).
A non-HTTPS endpointUrl or hitting your workspace subscription limit returns 422.

Step 3: Test your endpoint

Create a submission and watch your endpoint receive events:
import requests

API_BASE = "https://mm-midmarket-integrations-api-preview.azurewebsites.net"

response = requests.post(
    f"{API_BASE}/api/integrations/submissions",
    headers={"X-API-Key": "your-api-key-here"},
    json={
        "assets": [{"blobPath": "<workspace-id>/test.mp4"}],
        "sidekickIds": [],
    },
)

print(f"Submission created: {response.json()['submissionId']}")
print("Watch your webhook endpoint for incoming events...")

Webhook events

There are exactly two event types. The delivered envelope has id, type, timestamp, and a snake_case data object. The payload does not embed results — fetch detail by ID after receiving an event.

asset.processing.completed

Fires when an asset finishes processing successfully.
{
  "id": "evt_uuid",
  "type": "asset.processing.completed",
  "timestamp": "2026-06-03T12:05:00Z",
  "data": {
    "asset_id": "a1...",
    "submission_id": "s1...",
    "workspace_id": "w1...",
    "status": "completed",
    "filename": "campaign-video.mp4",
    "issue_count": 3
  }
}

asset.processing.failed

Fires when an asset fails to process. The data object adds error_message.
{
  "id": "evt_uuid",
  "type": "asset.processing.failed",
  "timestamp": "2026-06-03T12:10:00Z",
  "data": {
    "asset_id": "a1...",
    "submission_id": "s1...",
    "workspace_id": "w1...",
    "status": "failed",
    "error_message": "...",
    "filename": "bad.mp4"
  }
}

Handling webhooks

Basic handler pattern

Read event["type"] and the snake_case data fields. Because the payload carries no results, fetch issues or topics by asset ID when you need detail.
import requests

API_BASE = "https://mm-midmarket-integrations-api-preview.azurewebsites.net"
API_KEY = "your-api-key-here"

def handle_webhook(event: dict):
    """Process a verified webhook event."""

    event_type = event["type"]
    data = event["data"]
    submission_id = data["submission_id"]
    asset_id = data["asset_id"]

    if event_type == "asset.processing.completed":
        # Fetch the enriched issue list by ID — the payload has no results
        issues = requests.get(
            f"{API_BASE}/api/integrations/submissions/{submission_id}"
            f"/assets/{asset_id}/issues",
            headers={"X-API-Key": API_KEY},
        ).json()

        save_to_database(asset_id=asset_id, issues=issues["issues"])
        notify_user(asset_id, "processing_complete")

    elif event_type == "asset.processing.failed":
        save_error(
            asset_id=asset_id,
            error_message=data.get("error_message"),
        )
        notify_user(asset_id, "processing_failed")
If you prefer clustered topics instead of a flat issue list, fetch .../assets/{asset_id}/topics the same way.

Queue-based processing

For high-volume scenarios, queue webhooks for async processing and respond immediately:
import json
from celery import shared_task

@app.post("/webhooks/integrations")
async def handle_webhook(request: Request):
    # Verify signature
    try:
        event = verify_webhook(request)
    except Exception:
        return {"error": "Unauthorized"}, 401

    # Queue for async processing
    process_webhook.delay(json.dumps(event))

    # Respond immediately
    return {"status": "received"}

@shared_task
def process_webhook(event_json: str):
    """Process the webhook asynchronously."""

    event = json.loads(event_json)

    if event["type"] == "asset.processing.completed":
        fetch_and_store_results(event["data"])
    elif event["type"] == "asset.processing.failed":
        handle_error(event["data"])

Webhook reliability

Signature verification

Always verify webhook signatures with the subscription secret:
import os
from standardwebhooks.webhooks import Webhook, WebhookVerificationError

async def verify_webhook(request) -> dict:
    wh = Webhook(os.getenv("WEBHOOK_SECRET"))

    try:
        return wh.verify(
            payload=await request.body(),
            headers=dict(request.headers),
        )
    except WebhookVerificationError as e:
        raise ValueError(f"Invalid webhook: {e}")

Idempotency

Deliveries may arrive more than once. The delivery provider retries failed deliveries, so make your handler idempotent by deduplicating on the event id:
def process_webhook_idempotent(event: dict):
    """Process each event at most once."""

    event_id = event["id"]

    if already_processed(event_id):
        return {"status": "already_processed"}

    process_event(event)
    mark_as_processed(event_id)

    return {"status": "processed"}
Store processed event IDs in your database:
CREATE TABLE webhook_events (
  id VARCHAR(255) PRIMARY KEY,
  event_type VARCHAR(255),
  submission_id VARCHAR(255),
  processed_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
Failed deliveries are retried automatically by the delivery provider. Check the deliveries endpoint (GET .../subscriptions/{id}/deliveries) or the control plane for attempt history rather than assuming a fixed retry schedule.

Testing webhooks locally

Using ngrok

Expose your local server publicly:
ngrok http 3000
This gives you a URL like https://abc123.ngrok.io. Use it as the endpointUrl in your subscription.

Test endpoint

Use POST /health/webhook-test to validate that your signature verification is set up correctly, without affecting production data.
curl -X POST https://mm-midmarket-integrations-api-preview.azurewebsites.net/health/webhook-test \
  -H "Content-Type: application/json" \
  -d '{ ... }'

Managing subscriptions

MethodPathPurpose
GET/api/integrations/webhooks/subscriptionsList subscriptions (paginated)
GET/api/integrations/webhooks/subscriptions/{id}Fetch one subscription
PATCH/api/integrations/webhooks/subscriptions/{id}Enable or disable ({ "enabled": false })
DELETE/api/integrations/webhooks/subscriptions/{id}Delete (returns 204)
GET/api/integrations/webhooks/subscriptions/{id}/deliveriesView delivery attempts
The list endpoint returns a paginated envelope: { "items": [...], "page": 1, "pageSize": 10, "total": 3, "totalPages": 1 }. None of these responses include the secret — it is returned only on create.

Complete example

An end-to-end handler with verification, idempotency, and background processing:
import os
from fastapi import FastAPI, Request, BackgroundTasks
from standardwebhooks.webhooks import Webhook

app = FastAPI()
wh = Webhook(os.getenv("WEBHOOK_SECRET"))

# Track processed events for idempotency
processed_events = set()

@app.post("/webhooks/integrations")
async def handle_webhook(request: Request, background_tasks: BackgroundTasks):
    """Handle incoming webhook events."""

    # 1. Verify signature
    try:
        payload = await request.body()
        event = wh.verify(payload, dict(request.headers))
    except Exception as e:
        print(f"Invalid signature: {e}")
        return {"error": "Unauthorized"}, 401

    # 2. Deduplicate on event id
    event_id = event["id"]
    if event_id in processed_events:
        print(f"Event {event_id} already processed")
        return {"status": "already_processed"}

    # 3. Queue for background processing
    background_tasks.add_task(process_event, event)

    # 4. Return immediately
    return {"status": "received"}

async def process_event(event: dict):
    """Process the webhook event in the background."""

    event_type = event["type"]
    data = event["data"]

    try:
        if event_type == "asset.processing.completed":
            fetch_and_store_results(data["submission_id"], data["asset_id"])
            notify_user(data["asset_id"], "complete")
        elif event_type == "asset.processing.failed":
            save_error(data["asset_id"], data.get("error_message"))
            notify_user(data["asset_id"], "failed")

        processed_events.add(event["id"])
    except Exception as e:
        print(f"Error processing event: {e}")
        save_failed_webhook(event, str(e))

Best practices

  • Verify signatures on every webhook with the subscription secret.
  • Respond quickly and process asynchronously in the background.
  • Deduplicate on the event id to handle repeated deliveries.
  • Fetch issues or topics by asset ID — the payload carries no inline results.
  • Log all webhook events for debugging.
  • Monitor delivery status via the deliveries endpoint or the control plane.