Event Indexing Guide
Set up blockchain event indexing with ethers.js, Prisma, and PostgreSQL for real-time data synchronization.
Event Indexing Guide
Index KhipuVault smart contract events to your database for fast queries, analytics, and real-time updates.
Why Index Events?
- Performance: Query database faster than RPC calls
- Analytics: Historical data and aggregations
- Real-time: WebSocket updates to frontend
- Reliability: Retry failed RPC calls
- Cost: Reduce RPC usage
Architecture
Smart Contracts (Mezo)
│
│ Emit events
▼
WebSocket Listener (ethers.js)
│
│ Parse event data
▼
Event Processor
│
│ Transform & validate
▼
PostgreSQL (Prisma)
│
│ Query indexed data
▼
REST API / GraphQLSetup
1. Install Dependencies
pnpm add ethers@6 @prisma/client
pnpm add -D prisma2. Prisma Schema
// schema.prisma
model Transaction {
id String @id @default(cuid())
txHash String @unique
poolId String
userId String
type String // DEPOSIT, WITHDRAWAL, YIELD_CLAIM
amount BigInt
blockNumber Int
timestamp DateTime
createdAt DateTime @default(now())
pool Pool @relation(fields: [poolId], references: [id])
user User @relation(fields: [userId], references: [id])
@@index([userId])
@@index([poolId])
@@index([timestamp])
}
model Pool {
id String @id @default(cuid())
contractAddress String @unique
type String // individual, cooperative, etc.
balance BigInt
totalYield BigInt
lastUpdate DateTime
createdAt DateTime @default(now())
transactions Transaction[]
}3. Event Listener Service
Create src/indexer/index.ts:
import { ethers } from 'ethers'
import { prisma } from '@khipu/database'
import { logger } from './logger'
// Contract addresses and ABIs
import { CONTRACTS, ABIS } from './config'
// WebSocket provider
const provider = new ethers.WebSocketProvider(
process.env.RPC_WSS_URL || 'wss://rpc.test.mezo.org'
)
// Contract instances
const individualPool = new ethers.Contract(
CONTRACTS.INDIVIDUAL_POOL,
ABIS.INDIVIDUAL_POOL,
provider
)
// Listen to Deposit events
individualPool.on('Deposit', async (user, amount, newBalance, timestamp, event) => {
try {
logger.info({ user, amount: amount.toString() }, 'Deposit event received')
// Get transaction details
const tx = await event.getTransaction()
const block = await event.getBlock()
// Check if already indexed (idempotency)
const existing = await prisma.transaction.findUnique({
where: { txHash: tx.hash }
})
if (existing) {
logger.warn({ txHash: tx.hash }, 'Transaction already indexed')
return
}
// Get or create user
let dbUser = await prisma.user.findUnique({
where: { address: user.toLowerCase() }
})
if (!dbUser) {
dbUser = await prisma.user.create({
data: { address: user.toLowerCase() }
})
}
// Get pool
const pool = await prisma.pool.findUnique({
where: { contractAddress: CONTRACTS.INDIVIDUAL_POOL.toLowerCase() }
})
if (!pool) {
throw new Error('Pool not found in database')
}
// Store transaction
await prisma.transaction.create({
data: {
txHash: tx.hash,
poolId: pool.id,
userId: dbUser.id,
type: 'DEPOSIT',
amount: BigInt(amount.toString()),
blockNumber: block.number,
timestamp: new Date(block.timestamp * 1000)
}
})
// Update pool balance
await prisma.pool.update({
where: { id: pool.id },
data: {
balance: BigInt(newBalance.toString()),
lastUpdate: new Date()
}
})
logger.info({ txHash: tx.hash }, 'Deposit indexed successfully')
} catch (error) {
logger.error({ error, user, amount: amount.toString() }, 'Failed to index deposit')
// Implement retry logic here
}
})
// Listen to Withdrawal events
individualPool.on('Withdrawal', async (user, amount, newBalance, timestamp, event) => {
// Similar to deposit handler
})
// Listen to YieldClaimed events
individualPool.on('YieldClaimed', async (user, amount, timestamp, event) => {
// Similar handler
})
logger.info('Event indexer started')4. Error Handling & Retries
// utils/retry.ts
export async function retryWithBackoff<T>(
fn: () => Promise<T>,
maxRetries = 5
): Promise<T> {
for (let i = 0; i < maxRetries; i++) {
try {
return await fn()
} catch (error) {
if (i === maxRetries - 1) throw error
const delay = Math.min(1000 * Math.pow(2, i), 30000) // Max 30s
logger.warn({ attempt: i + 1, delay }, 'Retrying after error')
await new Promise(resolve => setTimeout(resolve, delay))
}
}
throw new Error('Max retries exceeded')
}
// Usage in event handler
await retryWithBackoff(async () => {
await prisma.transaction.create({ data: txData })
})5. Handle Reconnections
provider.on('error', async (error) => {
logger.error({ error }, 'Provider error')
// Reconnect
await provider.destroy()
const newProvider = new ethers.WebSocketProvider(process.env.RPC_WSS_URL!)
// Re-attach listeners
setupListeners(newProvider)
})6. Historical Event Sync
Sync past events on startup:
async function syncHistoricalEvents() {
const currentBlock = await provider.getBlockNumber()
const fromBlock = await getLastSyncedBlock() // From DB
logger.info({ fromBlock, currentBlock }, 'Syncing historical events')
// Query past events
const filter = individualPool.filters.Deposit()
const events = await individualPool.queryFilter(filter, fromBlock, currentBlock)
for (const event of events) {
// Process each event
await processDepositEvent(event)
}
// Update last synced block
await updateLastSyncedBlock(currentBlock)
}
// Run on startup
syncHistoricalEvents()Production Patterns
Idempotency
Always check if event already indexed:
const existing = await prisma.transaction.findUnique({
where: { txHash: event.transactionHash }
})
if (existing) {
logger.warn('Already indexed')
return
}Block Reorganizations
Handle chain reorgs:
provider.on('block', async (blockNumber) => {
// Check for reorgs
const block = await provider.getBlock(blockNumber)
const dbBlock = await prisma.block.findUnique({
where: { number: blockNumber }
})
if (dbBlock && dbBlock.hash !== block.hash) {
logger.warn({ blockNumber }, 'Reorg detected')
// Rollback transactions from this block
await prisma.transaction.deleteMany({
where: { blockNumber: { gte: blockNumber } }
})
// Re-sync from this block
await syncFromBlock(blockNumber)
}
})Monitoring
// Track sync status
setInterval(async () => {
const latestBlock = await provider.getBlockNumber()
const lastIndexed = await getLastIndexedBlock()
const lag = latestBlock - lastIndexed
if (lag > 10) {
logger.warn({ lag }, 'Indexer falling behind')
}
// Emit metrics
metrics.gauge('indexer.block_lag', lag)
}, 30000) // Every 30sTesting
import { describe, it, expect } from 'vitest'
describe('Event Indexer', () => {
it('should index deposit event', async () => {
// Mock event
const mockEvent = {
args: {
user: '0x123...',
amount: ethers.parseUnits('100', 18),
newBalance: ethers.parseUnits('100', 18),
timestamp: Date.now() / 1000
},
transactionHash: '0xabc...',
blockNumber: 12345
}
await processDepositEvent(mockEvent)
const tx = await prisma.transaction.findUnique({
where: { txHash: '0xabc...' }
})
expect(tx).toBeDefined()
expect(tx.type).toBe('DEPOSIT')
expect(tx.amount).toBe(100000000000000000000n)
})
})Deployment
Docker
# Dockerfile
FROM node:18-alpine
WORKDIR /app
COPY package*.json ./
RUN npm install
COPY . .
CMD ["node", "dist/indexer/index.js"]Docker Compose
# docker-compose.yml
services:
indexer:
build: .
environment:
- DATABASE_URL=postgresql://user:pass@db:5432/khipu
- RPC_WSS_URL=wss://rpc.test.mezo.org
depends_on:
- db
restart: unless-stoppedEnvironment Variables
DATABASE_URL=postgresql://user:password@localhost:5432/khipu
RPC_WSS_URL=wss://rpc.test.mezo.org
LOG_LEVEL=infoAlternative: Subgraph (The Graph)
For more advanced indexing, consider using The Graph:
# schema.graphql
type Transaction @entity {
id: ID!
txHash: Bytes!
user: User!
pool: Pool!
type: String!
amount: BigInt!
timestamp: BigInt!
}
type Pool @entity {
id: ID!
balance: BigInt!
transactions: [Transaction!]! @derivedFrom(field: "pool")
}Next Steps
Questions? Email dev@khipuvault.com or Discord #developers