@algorandfoundation/algokit-subscriber / index / AlgorandSubscriber
index.AlgorandSubscriber
Handles the logic for subscribing to the Algorand blockchain and emitting events.
• new AlgorandSubscriber(config, algod, indexer?): AlgorandSubscriber
Create a new AlgorandSubscriber.
| Name | Type | Description |
|---|---|---|
config |
AlgorandSubscriberConfig |
The subscriber configuration |
algod |
AlgodClient |
An algod client |
indexer? |
IndexerClient |
An (optional) indexer client; only needed if subscription.syncBehaviour is catchup-with-indexer |
• Private abortController: AbortController
• Private algod: AlgodClient
• Private config: AlgorandSubscriberConfig
• Private Readonly errorEventName: "error"
• Private eventEmitter: AsyncEventEmitter
• Private filterNames: string[]
• Private indexer: undefined | IndexerClient
• Private startPromise: undefined | Promise<void>
• Private started: boolean = false
▸ defaultErrorHandler(error): never
| Name | Type |
|---|---|
error |
unknown |
never
▸ on<T>(filterName, listener): AlgorandSubscriber
Register an event handler to run on every subscribed transaction matching the given filter name.
The listener can be async and it will be awaited if so.
| Name | Type |
|---|---|
T |
SubscribedTransaction |
| Name | Type | Description |
|---|---|---|
filterName |
string |
The name of the filter to subscribe to |
listener |
TypedAsyncEventListener<T> |
The listener function to invoke with the subscribed event |
The subscriber so on* calls can be chained
Example
subscriber.on('my-filter', async (transaction) => { console.log(transaction.id) })
Example
new AlgorandSubscriber({filters: [{name: 'my-filter', filter: {...}, mapper: (t) => t.id}], ...}, algod)
.on<string>('my-filter', async (transactionId) => { console.log(transactionId) })
▸ onBatch<T>(filterName, listener): AlgorandSubscriber
Register an event handler to run on all subscribed transactions matching the given filter name for each subscription poll.
This is useful when you want to efficiently process / persist events in bulk rather than one-by-one.
The listener can be async and it will be awaited if so.
| Name | Type |
|---|---|
T |
SubscribedTransaction |
| Name | Type | Description |
|---|---|---|
filterName |
string |
The name of the filter to subscribe to |
listener |
TypedAsyncEventListener<T[]> |
The listener function to invoke with the subscribed events |
The subscriber so on* calls can be chained
Example
subscriber.onBatch('my-filter', async (transactions) => { console.log(transactions.length) })
Example
new AlgorandSubscriber({filters: [{name: 'my-filter', filter: {...}, mapper: (t) => t.id}], ...}, algod)
.onBatch<string>('my-filter', async (transactionIds) => { console.log(transactionIds) })
▸ onBeforePoll(listener): AlgorandSubscriber
Register an event handler to run before every subscription poll.
This is useful when you want to do pre-poll logging or start a transaction etc.
The listener can be async and it will be awaited if so.
| Name | Type | Description |
|---|---|---|
listener |
TypedAsyncEventListener<BeforePollMetadata> |
The listener function to invoke with the pre-poll metadata |
The subscriber so on* calls can be chained
Example
subscriber.onBeforePoll(async (metadata) => { console.log(metadata.watermark) })
▸ onError(listener): AlgorandSubscriber
Register an error handler to run if an error is thrown during processing or event handling.
This is useful to handle any errors that occur and can be used to perform retries, logging or cleanup activities.
The listener can be async and it will be awaited if so.
| Name | Type | Description |
|---|---|---|
listener |
ErrorListener |
The listener function to invoke with the error that was thrown |
The subscriber so on* calls can be chained
Example
subscriber.onError((error) => { console.error(error) })
Example
const maxRetries = 3
let retryCount = 0
subscriber.onError(async (error) => {
retryCount++
if (retryCount > maxRetries) {
console.error(error)
return
}
console.log(`Error occurred, retrying in 2 seconds (${retryCount}/${maxRetries})`)
await new Promise((r) => setTimeout(r, 2_000))
subscriber.start()
})
▸ onPoll(listener): AlgorandSubscriber
Register an event handler to run after every subscription poll.
This is useful when you want to process all subscribed transactions in a transactionally consistent manner rather than piecemeal for each filter, or to have a hook that occurs at the end of each poll to commit transactions etc.
The listener can be async and it will be awaited if so.
| Name | Type | Description |
|---|---|---|
listener |
TypedAsyncEventListener<TransactionSubscriptionResult> |
The listener function to invoke with the poll result |
The subscriber so on* calls can be chained
Example
subscriber.onPoll(async (pollResult) => { console.log(pollResult.subscribedTransactions.length, pollResult.syncedRoundRange) })
▸ pollOnce(): Promise<TransactionSubscriptionResult>
Execute a single subscription poll.
This is useful when executing in the context of a process triggered by a recurring schedule / cron.
Promise<TransactionSubscriptionResult>
The poll result
▸ start(inspect?, suppressLog?): void
Start the subscriber in a loop until stop is called.
This is useful when running in the context of a long-running process / container.
| Name | Type | Description |
|---|---|---|
inspect? |
(pollResult: TransactionSubscriptionResult) => void |
A function that is called for each poll so the inner workings can be inspected / logged / etc. |
suppressLog? |
boolean |
- |
void
An object that contains a promise you can wait for after calling stop
▸ stop(reason): Promise<void>
Stops the subscriber if previously started via start.
| Name | Type | Description |
|---|---|---|
reason |
unknown |
The reason the subscriber is being stopped |
Promise<void>
A promise that can be awaited to ensure the subscriber has finished stopping