Custom Integration

Build a custom integration between storq.io and your application using webhooks and the REST API. This guide covers common integration patterns and best practices.

Integration Architecture

A typical storq.io integration has three components:

  1. API Client: Send orders and manage products
  2. Webhook Receiver: Receive real-time event notifications
  3. Sync Service: Keep inventory synchronized

Basic Integration Flow

1. Order Creation

When a customer places an order in your system, create it in storq.io:

curl https://storq.io/api/v1/orders \
  -H "Authorization: Bearer fsk_live_..." \
  -H "Content-Type: application/json" \
  -d '{
    "customer_email": "[email protected]",
    "external_id": "order_123",
    "line_items": [
      {
        "sku": "PRODUCT-001",
        "quantity": 2
      }
    ]
  }'
require "net/http"
require "uri"
require "json"

uri = URI("https://storq.io/api/v1/orders")
http = Net::HTTP.new(uri.host, uri.port)
http.use_ssl = true

request = Net::HTTP::Post.new(uri)
request["Authorization"] = "Bearer fsk_live_..."
request["Content-Type"] = "application/json"
request.body = {
  customer_email: "[email protected]",
  external_id: "order_123",
  line_items: [{ sku: "PRODUCT-001", quantity: 2 }]
}.to_json

response = http.request(request)
order = JSON.parse(response.body)
const response = await fetch("https://storq.io/api/v1/orders", {
  method: "POST",
  headers: {
    "Authorization": "Bearer fsk_live_...",
    "Content-Type": "application/json"
  },
  body: JSON.stringify({
    customer_email: "[email protected]",
    external_id: "order_123",
    line_items: [
      { sku: "PRODUCT-001", quantity: 2 }
    ]
  })
});

const order = await response.json();
import requests

response = requests.post(
    "https://storq.io/api/v1/orders",
    headers={"Authorization": "Bearer fsk_live_..."},
    json={
        "customer_email": "[email protected]",
        "external_id": "order_123",
        "line_items": [
            {"sku": "PRODUCT-001", "quantity": 2}
        ]
    }
)

order = response.json()

2. Monitor Order Status

Poll the order status endpoint or use webhooks:

curl https://storq.io/api/v1/orders/123/status \
  -H "Authorization: Bearer fsk_live_..."
require "net/http"
require "uri"

uri = URI("https://storq.io/api/v1/orders/123/status")
http = Net::HTTP.new(uri.host, uri.port)
http.use_ssl = true

request = Net::HTTP::Get.new(uri)
request["Authorization"] = "Bearer fsk_live_..."

response = http.request(request)
status = JSON.parse(response.body)
const response = await fetch("https://storq.io/api/v1/orders/123/status", {
  headers: {
    "Authorization": "Bearer fsk_live_..."
  }
});

const status = await response.json();
import requests

response = requests.get(
    "https://storq.io/api/v1/orders/123/status",
    headers={"Authorization": "Bearer fsk_live_..."}
)

status = response.json()

3. Update Your System

When you receive a webhook or detect status change, update your application:

def handle_order_shipped(storq.io_order)
  order = Order.find_by(external_id: storq.io_order["external_id"])
  order.update!(
    status: "shipped",
    tracking_number: storq.io_order["tracking_number"],
    shipped_at: storq.io_order["shipped_at"]
  )

  OrderMailer.shipped_notification(order).deliver_later
end
async function handleOrderShipped(storq.ioOrder) {
  await database.orders.update(storq.ioOrder.external_id, {
    status: 'shipped',
    tracking_number: storq.ioOrder.tracking_number,
    shipped_at: storq.ioOrder.shipped_at
  });

  await emailService.send({
    to: storq.ioOrder.customer_email,
    template: 'order_shipped',
    data: {
      tracking_number: storq.ioOrder.tracking_number
    }
  });
}
def handle_order_shipped(storq.io_order):
    order = Order.objects.get(
        external_id=storq.io_order["external_id"]
    )
    order.status = "shipped"
    order.tracking_number = storq.io_order["tracking_number"]
    order.shipped_at = storq.io_order["shipped_at"]
    order.save()

    send_shipped_notification(order)

Inventory Synchronization

Real-Time Sync with Webhooks

Subscribe to stock events to update inventory in real-time:

def handle_stock_updated(event)
  data = event["data"]["object"]
  product = Product.find_by(sku: data["sku"])
  product.update!(inventory: data["quantity"])
end
app.post('/webhooks/storq.io', async (req, res) => {
  const event = req.body;

  if (event.type === 'stock.updated') {
    const { sku, quantity } = event.data.object;

    await database.products.update({ sku }, {
      inventory: quantity
    });
  }

  res.sendStatus(200);
});
@app.route('/webhooks/storq.io', methods=['POST'])
def handle_webhook():
    event = request.get_json()

    if event['type'] == 'stock.updated':
        sku = event['data']['object']['sku']
        quantity = event['data']['object']['quantity']
        Product.objects.filter(sku=sku).update(
            inventory=quantity
        )

    return '', 200

Periodic Sync (Fallback)

Run a periodic sync job to ensure consistency:

class InventorySyncJob < ApplicationJob
  queue_as :default

  def perform
    uri = URI("https://storq.io/api/v1/stock")
    http = Net::HTTP.new(uri.host, uri.port)
    http.use_ssl = true

    request = Net::HTTP::Get.new(uri)
    request["Authorization"] = "Bearer #{ENV['STORQ_API_KEY']}"

    response = http.request(request)
    data = JSON.parse(response.body)

    data["stock"].each do |item|
      Product.find_by(sku: item["sku"])&.update!(
        inventory: item["quantity"],
        last_synced_at: Time.current
      )
    end
  end
end
async function syncInventory() {
  const response = await fetch(
    'https://storq.io/api/v1/stock',
    {
      headers: {
        'Authorization': `Bearer ${process.env.STORQ_API_KEY}`
      }
    }
  );

  const { stock } = await response.json();

  for (const item of stock) {
    await database.products.update({ sku: item.sku }, {
      inventory: item.quantity,
      last_synced: new Date()
    });
  }
}

setInterval(syncInventory, 5 * 60 * 1000);
import requests
import schedule
from datetime import datetime

def sync_inventory():
    response = requests.get(
        "https://storq.io/api/v1/stock",
        headers={
            "Authorization": f"Bearer {os.environ['STORQ_API_KEY']}"
        }
    )

    for item in response.json()["stock"]:
        Product.objects.filter(sku=item["sku"]).update(
            inventory=item["quantity"],
            last_synced=datetime.now()
        )

schedule.every(5).minutes.do(sync_inventory)

Product Management

Creating Products

def create_product(name:, sku:)
  uri = URI("https://storq.io/api/v1/products")
  http = Net::HTTP.new(uri.host, uri.port)
  http.use_ssl = true

  request = Net::HTTP::Post.new(uri)
  request["Authorization"] = "Bearer #{ENV['STORQ_API_KEY']}"
  request["Content-Type"] = "application/json"
  request.body = { name: name, sku: sku }.to_json

  response = http.request(request)
  JSON.parse(response.body)
end
async function createProduct(product) {
  const response = await fetch(
    'https://storq.io/api/v1/products',
    {
      method: 'POST',
      headers: {
        'Authorization': `Bearer ${process.env.STORQ_API_KEY}`,
        'Content-Type': 'application/json'
      },
      body: JSON.stringify({
        name: product.name,
        sku: product.sku
      })
    }
  );

  return response.json();
}
def create_product(name, sku):
    response = requests.post(
        "https://storq.io/api/v1/products",
        headers={
            "Authorization": f"Bearer {os.environ['STORQ_API_KEY']}"
        },
        json={"name": name, "sku": sku}
    )

    return response.json()

Error Handling

Retry Logic

def api_request(uri, request, retries: 3)
  retries.times do |i|
    response = Net::HTTP.start(uri.host, uri.port, use_ssl: true) do |http|
      http.request(request)
    end

    if response.code == "429"
      retry_after = response["Retry-After"]&.to_i || 5
      sleep(retry_after)
      next
    end

    raise "HTTP #{response.code}" unless response.is_a?(Net::HTTPSuccess)
    return JSON.parse(response.body)
  rescue StandardError => e
    raise if i == retries - 1
    sleep(2**i)
  end
end
async function apiRequest(url, options, retries = 3) {
  for (let i = 0; i < retries; i++) {
    try {
      const response = await fetch(url, options);

      if (response.status === 429) {
        const retryAfter = response.headers.get('Retry-After') || 5;
        await sleep(retryAfter * 1000);
        continue;
      }

      if (!response.ok) {
        throw new Error(`HTTP ${response.status}`);
      }

      return response.json();
    } catch (error) {
      if (i === retries - 1) throw error;
      await sleep(Math.pow(2, i) * 1000);
    }
  }
}
import time

def api_request(url, method="GET", retries=3, **kwargs):
    for i in range(retries):
        try:
            response = getattr(requests, method.lower())(url, **kwargs)

            if response.status_code == 429:
                retry_after = int(
                    response.headers.get("Retry-After", 5)
                )
                time.sleep(retry_after)
                continue

            response.raise_for_status()
            return response.json()
        except Exception as e:
            if i == retries - 1:
                raise
            time.sleep(2 ** i)

Idempotency

Use external_id to prevent duplicate orders:

def create_order_safely(order)
  uri = URI("https://storq.io/api/v1/orders?external_id=#{order.id}")
  http = Net::HTTP.new(uri.host, uri.port)
  http.use_ssl = true

  request = Net::HTTP::Get.new(uri)
  request["Authorization"] = "Bearer #{ENV['STORQ_API_KEY']}"

  response = http.request(request)
  existing = JSON.parse(response.body)

  return existing.first if existing.any?

  create_order(order)
end
async function createOrderSafely(order) {
  const existing = await fetch(
    `https://storq.io/api/v1/orders?external_id=${order.id}`,
    {
      headers: {
        'Authorization': `Bearer ${process.env.STORQ_API_KEY}`
      }
    }
  );

  const orders = await existing.json();
  if (orders.length > 0) {
    return orders[0];
  }

  return await createOrder(order);
}
def create_order_safely(order):
    response = requests.get(
        f"https://storq.io/api/v1/orders?external_id={order.id}",
        headers={
            "Authorization": f"Bearer {os.environ['STORQ_API_KEY']}"
        }
    )

    existing = response.json()
    if existing:
        return existing[0]

    return create_order(order)

Complete Integration Example

Here's a complete integration example:

class StorqClient
  BASE_URL = "https://storq.io/api/v1"

  def initialize(api_key)
    @api_key = api_key
  end

  def create_order(order)
    request(:post, "/orders", body: order)
  end

  def get_stock(sku)
    request(:get, "/stock?sku=#{sku}")
  end

  private

  def request(method, path, body: nil)
    uri = URI("#{BASE_URL}#{path}")
    http = Net::HTTP.new(uri.host, uri.port)
    http.use_ssl = true

    klass = Net::HTTP.const_get(method.to_s.capitalize)
    req = klass.new(uri)
    req["Authorization"] = "Bearer #{@api_key}"
    req["Content-Type"] = "application/json"
    req.body = body.to_json if body

    response = http.request(req)
    JSON.parse(response.body)
  end
end

client = StorqClient.new(ENV["STORQ_API_KEY"])

order = client.create_order(
  customer_email: "[email protected]",
  external_id: "order_123",
  line_items: [{ sku: "MOUSE-001", quantity: 2 }]
)
const express = require('express');
const crypto = require('crypto');

const app = express();
app.use(express.json());

const STORQ_API_KEY = process.env.STORQ_API_KEY;
const WEBHOOK_SECRET = process.env.STORQ_WEBHOOK_SECRET;

class StorqClient {
  constructor(apiKey) {
    this.apiKey = apiKey;
    this.baseURL = 'https://storq.io/api/v1';
  }

  async request(path, options = {}) {
    const response = await fetch(`${this.baseURL}${path}`, {
      ...options,
      headers: {
        'Authorization': `Bearer ${this.apiKey}`,
        'Content-Type': 'application/json',
        ...options.headers
      }
    });

    if (!response.ok) {
      throw new Error(`API Error: ${response.statusText}`);
    }

    return response.json();
  }

  async createOrder(order) {
    return this.request('/orders', {
      method: 'POST',
      body: JSON.stringify(order)
    });
  }

  async getStock(sku) {
    return this.request(`/stock?sku=${sku}`);
  }
}

const storq.io = new StorqClient(STORQ_API_KEY);

app.post('/orders', async (req, res) => {
  try {
    const order = await storq.io.createOrder({
      customer_email: req.body.email,
      external_id: req.body.id,
      line_items: req.body.items
    });

    res.json(order);
  } catch (error) {
    res.status(500).json({ error: error.message });
  }
});

app.post('/webhooks/storq.io', (req, res) => {
  const signature = req.headers['storq.io-signature'];
  const payload = JSON.stringify(req.body);

  const expected = crypto
    .createHmac('sha256', WEBHOOK_SECRET)
    .update(payload)
    .digest('hex');

  if (!crypto.timingSafeEqual(Buffer.from(expected), Buffer.from(signature))) {
    return res.sendStatus(401);
  }

  const event = req.body;

  switch (event.type) {
    case 'order.shipped':
      handleOrderShipped(event.data.object);
      break;
    case 'stock.updated':
      handleStockUpdated(event.data.object);
      break;
  }

  res.sendStatus(200);
});

app.listen(3000, () => {
  console.log('Integration running on port 3000');
});
import hmac
import hashlib
import json
import os
import requests as http_requests
from flask import Flask, request, jsonify

app = Flask(__name__)

STORQ_API_KEY = os.environ['STORQ_API_KEY']
WEBHOOK_SECRET = os.environ['STORQ_WEBHOOK_SECRET']
BASE_URL = "https://storq.io/api/v1"

class StorqClient:
    def __init__(self, api_key):
        self.api_key = api_key
        self.headers = {
            "Authorization": f"Bearer {api_key}",
            "Content-Type": "application/json"
        }

    def create_order(self, order):
        response = http_requests.post(
            f"{BASE_URL}/orders",
            headers=self.headers,
            json=order
        )
        response.raise_for_status()
        return response.json()

    def get_stock(self, sku):
        response = http_requests.get(
            f"{BASE_URL}/stock?sku={sku}",
            headers=self.headers
        )
        response.raise_for_status()
        return response.json()

storq.io = StorqClient(STORQ_API_KEY)

@app.route('/orders', methods=['POST'])
def create_order():
    data = request.get_json()
    order = storq.io.create_order({
        "customer_email": data["email"],
        "external_id": data["id"],
        "line_items": data["items"]
    })
    return jsonify(order)

@app.route('/webhooks/storq.io', methods=['POST'])
def handle_webhook():
    payload = request.get_data(as_text=True)
    signature = request.headers.get('Storq-Signature')

    expected = hmac.new(
        WEBHOOK_SECRET.encode(),
        payload.encode(),
        hashlib.sha256
    ).hexdigest()

    if not hmac.compare_digest(expected, signature):
        return '', 401

    event = json.loads(payload)

    if event['type'] == 'order.shipped':
        handle_order_shipped(event['data']['object'])
    elif event['type'] == 'stock.updated':
        handle_stock_updated(event['data']['object'])

    return '', 200

if __name__ == '__main__':
    app.run(port=3000)

Best Practices

Testing Your Integration

Test Environment

Use test mode API keys for development:

# Test API Key (safe to use in development)
STORQ_API_KEY=fsk_test_...

# Production API Key (use only in production)
STORQ_API_KEY=fsk_live_...

Webhook Testing

Use ngrok to test webhooks locally:

# Install ngrok
npm install -g ngrok

# Start your local server
node server.js

# Create tunnel
ngrok http 3000

# Use the HTTPS URL in storq.io webhook settings
https://abc123.ngrok.io/webhooks/storq.io

Need help? Join our developer community or email [email protected] for integration support.