Improving Neutrino's speed: Parallelizing block header download.

Photo by Clark Tibbs on Unsplash

Improving Neutrino's speed: Parallelizing block header download.

INTRODUCTION

One of the projects I worked on as part of my Summer of Bitcoin project is parallelizing header downloads across peers in Neutrino. I worked on this project under the mentorship of Jordi Montes, whose support and guidance helped me through this process. In this blog, I will explain the current process in which Neutrino downloads headers from peers, why that is so, and how checkpoints can help us overcome the current limitation to achieve a parallelized header download.

CURRENT NEUTRINO SYNC PROCESS FOR BLOCK HEADERS.

When a neutrino node starts up, it begins fetching headers from peers to stay current. As soon as a peer is connected to the neutrino node, a signal is sent to the blockhandler. The blockhandler, is a goroutine that manages everything involved with fetching headers, to enable the node to stay up to date.

// blockHandler is the main handler for the block manager.  It must be run as a
// goroutine.  It processes block and inv messages in a separate goroutine from
// the peer handlers so the block (MsgBlock) messages are handled by a single
// thread without needing to lock memory data structures.  This is important
// because the block manager controls which blocks are needed and how
// the fetching should proceed.
func (b *blockManager) blockHandler() {
    defer b.wg.Done()

    candidatePeers := list.New()
out:
    for {
        // Now check peer messages and quit channels.
        select {
        case m := <-b.peerChan:
            switch msg := m.(type) {
            case *newPeerMsg:
                b.handleNewPeerMsg(candidatePeers, msg.peer)

            case *invMsg:
                b.handleInvMsg(msg)

            case *headersMsg:
                b.handleHeadersMsg(msg)

            case *donePeerMsg:
                b.handleDonePeerMsg(candidatePeers, msg.peer)

            default:
                log.Warnf("Invalid message type in block "+
                    "handler: %T", msg)
            }

        case <-b.quit:
            break out
        }
    }
    log.Trace("Block handler done")
}

Source: https://github.com/lightninglabs/neutrino/blob/42a196facefac36282e68ddf4a02c9ce4601a0b0/blockmanager.go#L1975C1-L2015C1

From the sample code above one can see the various message type that the handler works on. The message sent when a new peer has been added is the newPeerMsg . This message is handled by the functionhandleNewPeer message. In this function, the startSync function is called. The startSync the function selects the best peer to sync with ( the current algorithm is to select the peer with the highest block height). This selected peer is then assigned as the syncPeer and a request is sent to it for headers. The peer chosen as the syncPeer continues on as the syncPeer until the peer disconnects then handleDonePeerMsg calls the startSync function, and a new syncPeer is chosen. This means that only one peer at a time is used to fetch block headers for the neutrino node.

IMPLEMENTING PARALLELIZED HEADER DOWNLOAD.

To fetch headers from a peer, a message known as the GetHeaders message is sent to the peer. Let us take a closer look at this message.

type MsgGetHeaders struct {
    ProtocolVersion    uint32
    BlockLocatorHashes []*chainhash.Hash
    HashStop           chainhash.Hash
}

The above is a Golang struct object representing the MsgGetHeaders message. The protocolVersion is data available in the chain service but let us take a closer look at the last fields which change as the request changes.

  1. BlockLocatorHashes

    As can be seen in the struct object, the BlockLocatorHashes is a list of block hashes. Supposing a chain is at height 546 and it needs to fetch the next set of headers after block height 546. The BlockLocator would include the hash of block 546 as well as other block hashes of the previous block height down to the Genesis block. Not that it would include all hashes from blocks 0 - 546, but an algorithm is implemented to select the block hashes from blocks 0 -546 that are to be included in the BlockLocatorHashes .

  2. HashStop

    This is the block hash of the last block the requesting peer wants to receive.

Leveraging checkpoints we can create a batch of multiple requests and send them to multiple peers for syncing.

    Checkpoints: []Checkpoint{
        {11111, newHashFromStr("0000000069e244f73d78e8fd29ba2fd2ed618bd6fa2ee92559f542fdb26e7c1d")},
        {33333, newHashFromStr("000000002dd5588a74784eaa7ab0507a18ad16a236e7b1ce69f00d7ddfb5d0a6")},
        {74000, newHashFromStr("0000000000573993a3c9e41ce34471c079dcf5f52a0e824a81e7f953b8661a20")},
        {105000, newHashFromStr("00000000000291ce28027faea320c8d2b054b2e0fe44a773f3eefb151d6bdc97")},
        {134444, newHashFromStr("00000000000005b12ffd4cd315cd34ffd4a594f430ac814c91184a0d42d2b0fe")},
        {168000, newHashFromStr("000000000000099e61ea72015e79632f216fe6cb33d7899acb35b75c8303b763")},
        {193000, newHashFromStr("000000000000059f452a5f7340de6682a977387c17010ff6e6c3bd83ca8b1317")},
        {210000, newHashFromStr("000000000000048b95347e83192f69cf0366076336c639f9b7228e9ba171342e")},
        {216116, newHashFromStr("00000000000001b4f4b433e81ee46494af945cf96014816a4e2370f11b23df4e")},
        {225430, newHashFromStr("00000000000001c108384350f74090433e7fcf79a606b8e797f065b130575932")},
        {250000, newHashFromStr("000000000000003887df1f29024b06fc2200b55f8af8f35453d7be294df2d214")},
        {267300, newHashFromStr("000000000000000a83fbd660e918f218bf37edd92b748ad940483c7c116179ac")},
        {279000, newHashFromStr("0000000000000001ae8c72a0b0c301f67e3afca10e819efa9041e458e9bd7e40")},
        {300255, newHashFromStr("0000000000000000162804527c6e9b9f0563a280525f9d08c12041def0a0f3b2")},
        {319400, newHashFromStr("000000000000000021c6052e9becade189495d1c539aa37c58917305fd15f13b")},
        {343185, newHashFromStr("0000000000000000072b8bf361d01a6ba7d445dd024203fafc78768ed4368554")},
        {352940, newHashFromStr("000000000000000010755df42dba556bb72be6a32f3ce0b6941ce4430152c9ff")},
        {382320, newHashFromStr("00000000000000000a8dc6ed5b133d0eb2fd6af56203e4159789b092defd8ab2")},
        {400000, newHashFromStr("000000000000000004ec466ce4732fe6f1ed1cddc2ed4b328fff5224276e3f6f")},
        {430000, newHashFromStr("000000000000000001868b2bb3a285f3cc6b33ea234eb70facf4dcdf22186b87")},
        {460000, newHashFromStr("000000000000000000ef751bbce8e744ad303c47ece06c8d863e4d417efc258c")},
        {490000, newHashFromStr("000000000000000000de069137b17b8d5a3dfbd5b145b2dcfb203f15d0c4de90")},
        {520000, newHashFromStr("0000000000000000000d26984c0229c9f6962dc74db0a6d525f2f1640396f69c")},
        {550000, newHashFromStr("000000000000000000223b7a2298fb1c6c75fb0efc28a4c56853ff4112ec6bc9")},
        {560000, newHashFromStr("0000000000000000002c7b276daf6efb2b6aa68e2ce3be67ef925b3264ae7122")},
    },

Source: https://github.com/btcsuite/btcd/blob/40d7a0a06c7b921384558ae7549bbc09a86f3da9/chaincfg/params.go#L310-L343

The above represents checkpoints of the mainnet Bitcoin network. They are trusted and verified points in the blockchain. As they contain block hashes in their structure, one can construct a GetHeaders message using the data from checkpoints. Without checkpoints, it would be impossible to create a batch of GetHeader requests. This is because we can't obtain the block hash of a header that we do not have and this hash is a requirement for the blockLocator and HashStopfield in the GetHeaders message. To batch these requests, I created a blockmanager method known as the batchcheckpointedBlkHeaders function.


func (b *blockManager) batchCheckpointedBlkHeaders() {
    fmt.Println("Starting batch checkpointed headers")

    var queryMsgs []*headerQuery
    curHeight := b.headerTip
    curHash := b.headerTipHash
    nextCheckpoint := b.nextCheckpoint
    nextCheckptHash := nextCheckpoint.Hash
    nextCheckptHeight := nextCheckpoint.Height

    log.Infof("Fetching set of checkpointed blockheaders from "+
        "height=%v, hash=%v\n", curHeight, curHash)

    for nextCheckpoint != nil {

        endHash := nextCheckptHash
        endHeight := nextCheckptHeight
        tmpCurHash := curHash

        queryMsg := &headerQuery{
            Message: &wire.MsgGetHeaders{
                BlockLocatorHashes: blockchain.BlockLocator([]*chainhash.Hash{&tmpCurHash}),
                HashStop:           *endHash,
            },
            startHeight:   int32(curHeight),
            initialHeight: int32(curHeight),
            startHash:     curHash,
            endHeight:     endHeight,
            initialHash:   tmpCurHash,
        }

        log.Infof("Fetching set of checkpointed blockheaders from "+
            "start_height=%v to end-height=%v", curHeight, endHash)

        queryMsgs = append(queryMsgs, queryMsg)
        curHeight = uint32(endHeight)
        curHash = *endHash

        nextCheckpoint := b.findNextHeaderCheckpoint(int32(curHeight))
        if nextCheckpoint == nil {
            break
        }

        nextCheckptHeight = nextCheckpoint.Height
        nextCheckptHash = nextCheckpoint.Hash

    }

    queryMsg := &headerQuery{
        Message: &wire.MsgGetHeaders{
            BlockLocatorHashes: blockchain.BlockLocator([]*chainhash.Hash{nextCheckptHash}),
            HashStop:           zeroHash,
        },
        startHeight:   nextCheckptHeight,
        initialHeight: nextCheckptHeight,
        startHash:     *nextCheckptHash,
        endHeight:     nextCheckptHeight + wire.MaxBlockHeadersPerMsg,
        initialHash:   *nextCheckptHash,
    }

    log.Infof("Fetching set of checkpointed blockheaders from "+
        "start_height=%v to end-height=%v", curHeight, zeroHash)

    queryMsgs = append(queryMsgs, queryMsg)

    log.Infof("Attempting to query for %v blockheader batches", len(queryMsgs))

    q := CheckpointedBlockHeadersQuery{
        blockMgr: b,
        msgs:     queryMsgs,
    }
    fmt.Println("sending query f")

    errChan := b.cfg.checkptHdrQueryDispatcher.Query(
        q.requests(), query.Cancel(b.quit), query.NoTimeout(), query.NoRetryMax(),
        query.NoTimeout(), query.KeepBatch(),
    )

    wm := b.cfg.checkptHdrQueryDispatcher

    defer func(wm query.WorkManager) {
        err := wm.Stop()
        if err != nil {
            log.Errorf("Unable to stop block header workmanager: %v", err)
        }
    }(wm)

    b.writeCheckptHeaders(errChan)

}

Source: https://github.com/lightninglabs/neutrino/blob/fc413d722789fd7a76dfdad640d45d10384f4670/blockmanager.go#L2162-L2258

The batchcheckpointedBlkHeaders function creates multiple requests and sends the request to the workmanager to be distributed to the workers. The workmanager is what is currently used by Neutrino to fetch checkpointed filter headers across peers.

One thing to note is that though, the process of fetching headers can be parallelized, the block headers have to be validated in order. This is because of the process of validating headers:

  • To validate a header its previous block metadata must be the same as the hash of the previous block which the neutrino node sees as preceding it. For example, if we want to validate block 546, we need to check if the previous block referenced in its header metadata is the same as block 545. Now the node would not be able to carry out this validation if it does not have block 545. My first approach to fixing this was creating multiple header list structure for each checkpoint region in that way since the hash of a block at a checkpoint height is known we can validate subsequent blocks but the second verification that occurs while validating a header put a death knell to that idea.

  • Proof of work verification is also carried out on the header, the expected difficulty of the block is recalculated at every retarget height. This is the height at which the difficulty of proof of work is adjusted. To make this adjustment, the node needs the timestamp of the block at the previous retarget height. Most times this block height falls beyond the checkpoint range and so each checkpoint region can not autonomously validate its headers.

To address this, I created a goroutine loop that ensures that the blocks are written and verified in order.

To get more details about the implementation check out the pull request.