KhipuVault Docs

Event Indexing Guide

Set up blockchain event indexing with ethers.js, Prisma, and PostgreSQL for real-time data synchronization.

Event Indexing Guide

Index KhipuVault smart contract events to your database for fast queries, analytics, and real-time updates.

Why Index Events?

  • Performance: Query database faster than RPC calls
  • Analytics: Historical data and aggregations
  • Real-time: WebSocket updates to frontend
  • Reliability: Retry failed RPC calls
  • Cost: Reduce RPC usage

Architecture

Smart Contracts (Mezo)

        │ Emit events

WebSocket Listener (ethers.js)

        │ Parse event data

Event Processor

        │ Transform & validate

PostgreSQL (Prisma)

        │ Query indexed data

REST API / GraphQL

Setup

1. Install Dependencies

pnpm add ethers@6 @prisma/client
pnpm add -D prisma

2. Prisma Schema

// schema.prisma
model Transaction {
  id          String   @id @default(cuid())
  txHash      String   @unique
  poolId      String
  userId      String
  type        String   // DEPOSIT, WITHDRAWAL, YIELD_CLAIM
  amount      BigInt
  blockNumber Int
  timestamp   DateTime
  createdAt   DateTime @default(now())

  pool        Pool     @relation(fields: [poolId], references: [id])
  user        User     @relation(fields: [userId], references: [id])

  @@index([userId])
  @@index([poolId])
  @@index([timestamp])
}

model Pool {
  id              String   @id @default(cuid())
  contractAddress String   @unique
  type            String   // individual, cooperative, etc.
  balance         BigInt
  totalYield      BigInt
  lastUpdate      DateTime
  createdAt       DateTime @default(now())

  transactions    Transaction[]
}

3. Event Listener Service

Create src/indexer/index.ts:

import { ethers } from 'ethers'
import { prisma } from '@khipu/database'
import { logger } from './logger'

// Contract addresses and ABIs
import { CONTRACTS, ABIS } from './config'

// WebSocket provider
const provider = new ethers.WebSocketProvider(
  process.env.RPC_WSS_URL || 'wss://rpc.test.mezo.org'
)

// Contract instances
const individualPool = new ethers.Contract(
  CONTRACTS.INDIVIDUAL_POOL,
  ABIS.INDIVIDUAL_POOL,
  provider
)

// Listen to Deposit events
individualPool.on('Deposit', async (user, amount, newBalance, timestamp, event) => {
  try {
    logger.info({ user, amount: amount.toString() }, 'Deposit event received')

    // Get transaction details
    const tx = await event.getTransaction()
    const block = await event.getBlock()

    // Check if already indexed (idempotency)
    const existing = await prisma.transaction.findUnique({
      where: { txHash: tx.hash }
    })

    if (existing) {
      logger.warn({ txHash: tx.hash }, 'Transaction already indexed')
      return
    }

    // Get or create user
    let dbUser = await prisma.user.findUnique({
      where: { address: user.toLowerCase() }
    })

    if (!dbUser) {
      dbUser = await prisma.user.create({
        data: { address: user.toLowerCase() }
      })
    }

    // Get pool
    const pool = await prisma.pool.findUnique({
      where: { contractAddress: CONTRACTS.INDIVIDUAL_POOL.toLowerCase() }
    })

    if (!pool) {
      throw new Error('Pool not found in database')
    }

    // Store transaction
    await prisma.transaction.create({
      data: {
        txHash: tx.hash,
        poolId: pool.id,
        userId: dbUser.id,
        type: 'DEPOSIT',
        amount: BigInt(amount.toString()),
        blockNumber: block.number,
        timestamp: new Date(block.timestamp * 1000)
      }
    })

    // Update pool balance
    await prisma.pool.update({
      where: { id: pool.id },
      data: {
        balance: BigInt(newBalance.toString()),
        lastUpdate: new Date()
      }
    })

    logger.info({ txHash: tx.hash }, 'Deposit indexed successfully')
  } catch (error) {
    logger.error({ error, user, amount: amount.toString() }, 'Failed to index deposit')
    // Implement retry logic here
  }
})

// Listen to Withdrawal events
individualPool.on('Withdrawal', async (user, amount, newBalance, timestamp, event) => {
  // Similar to deposit handler
})

// Listen to YieldClaimed events
individualPool.on('YieldClaimed', async (user, amount, timestamp, event) => {
  // Similar handler
})

logger.info('Event indexer started')

4. Error Handling & Retries

// utils/retry.ts
export async function retryWithBackoff<T>(
  fn: () => Promise<T>,
  maxRetries = 5
): Promise<T> {
  for (let i = 0; i < maxRetries; i++) {
    try {
      return await fn()
    } catch (error) {
      if (i === maxRetries - 1) throw error

      const delay = Math.min(1000 * Math.pow(2, i), 30000) // Max 30s
      logger.warn({ attempt: i + 1, delay }, 'Retrying after error')

      await new Promise(resolve => setTimeout(resolve, delay))
    }
  }

  throw new Error('Max retries exceeded')
}

// Usage in event handler
await retryWithBackoff(async () => {
  await prisma.transaction.create({ data: txData })
})

5. Handle Reconnections

provider.on('error', async (error) => {
  logger.error({ error }, 'Provider error')

  // Reconnect
  await provider.destroy()
  const newProvider = new ethers.WebSocketProvider(process.env.RPC_WSS_URL!)

  // Re-attach listeners
  setupListeners(newProvider)
})

6. Historical Event Sync

Sync past events on startup:

async function syncHistoricalEvents() {
  const currentBlock = await provider.getBlockNumber()
  const fromBlock = await getLastSyncedBlock() // From DB

  logger.info({ fromBlock, currentBlock }, 'Syncing historical events')

  // Query past events
  const filter = individualPool.filters.Deposit()
  const events = await individualPool.queryFilter(filter, fromBlock, currentBlock)

  for (const event of events) {
    // Process each event
    await processDepositEvent(event)
  }

  // Update last synced block
  await updateLastSyncedBlock(currentBlock)
}

// Run on startup
syncHistoricalEvents()

Production Patterns

Idempotency

Always check if event already indexed:

const existing = await prisma.transaction.findUnique({
  where: { txHash: event.transactionHash }
})

if (existing) {
  logger.warn('Already indexed')
  return
}

Block Reorganizations

Handle chain reorgs:

provider.on('block', async (blockNumber) => {
  // Check for reorgs
  const block = await provider.getBlock(blockNumber)
  const dbBlock = await prisma.block.findUnique({
    where: { number: blockNumber }
  })

  if (dbBlock && dbBlock.hash !== block.hash) {
    logger.warn({ blockNumber }, 'Reorg detected')

    // Rollback transactions from this block
    await prisma.transaction.deleteMany({
      where: { blockNumber: { gte: blockNumber } }
    })

    // Re-sync from this block
    await syncFromBlock(blockNumber)
  }
})

Monitoring

// Track sync status
setInterval(async () => {
  const latestBlock = await provider.getBlockNumber()
  const lastIndexed = await getLastIndexedBlock()

  const lag = latestBlock - lastIndexed

  if (lag > 10) {
    logger.warn({ lag }, 'Indexer falling behind')
  }

  // Emit metrics
  metrics.gauge('indexer.block_lag', lag)
}, 30000) // Every 30s

Testing

import { describe, it, expect } from 'vitest'

describe('Event Indexer', () => {
  it('should index deposit event', async () => {
    // Mock event
    const mockEvent = {
      args: {
        user: '0x123...',
        amount: ethers.parseUnits('100', 18),
        newBalance: ethers.parseUnits('100', 18),
        timestamp: Date.now() / 1000
      },
      transactionHash: '0xabc...',
      blockNumber: 12345
    }

    await processDepositEvent(mockEvent)

    const tx = await prisma.transaction.findUnique({
      where: { txHash: '0xabc...' }
    })

    expect(tx).toBeDefined()
    expect(tx.type).toBe('DEPOSIT')
    expect(tx.amount).toBe(100000000000000000000n)
  })
})

Deployment

Docker

# Dockerfile
FROM node:18-alpine

WORKDIR /app

COPY package*.json ./
RUN npm install

COPY . .

CMD ["node", "dist/indexer/index.js"]

Docker Compose

# docker-compose.yml
services:
  indexer:
    build: .
    environment:
      - DATABASE_URL=postgresql://user:pass@db:5432/khipu
      - RPC_WSS_URL=wss://rpc.test.mezo.org
    depends_on:
      - db
    restart: unless-stopped

Environment Variables

DATABASE_URL=postgresql://user:password@localhost:5432/khipu
RPC_WSS_URL=wss://rpc.test.mezo.org
LOG_LEVEL=info

Alternative: Subgraph (The Graph)

For more advanced indexing, consider using The Graph:

# schema.graphql
type Transaction @entity {
  id: ID!
  txHash: Bytes!
  user: User!
  pool: Pool!
  type: String!
  amount: BigInt!
  timestamp: BigInt!
}

type Pool @entity {
  id: ID!
  balance: BigInt!
  transactions: [Transaction!]! @derivedFrom(field: "pool")
}

Next Steps


Questions? Email dev@khipuvault.com or Discord #developers

On this page