INTRODUCTION
One of the projects I worked on as part of my Summer of Bitcoin project is parallelizing header downloads across peers in Neutrino. I worked on this project under the mentorship of Jordi Montes, whose support and guidance helped me through this process. In this blog, I will explain the current process in which Neutrino downloads headers from peers, why that is so, and how checkpoints can help us overcome the current limitation to achieve a parallelized header download.
CURRENT NEUTRINO SYNC PROCESS FOR BLOCK HEADERS.
When a neutrino node starts up, it begins fetching headers from peers to stay current. As soon as a peer is connected to the neutrino node, a signal is sent to the blockhandler. The blockhandler, is a goroutine that manages everything involved with fetching headers, to enable the node to stay up to date.
// blockHandler is the main handler for the block manager. It must be run as a
// goroutine. It processes block and inv messages in a separate goroutine from
// the peer handlers so the block (MsgBlock) messages are handled by a single
// thread without needing to lock memory data structures. This is important
// because the block manager controls which blocks are needed and how
// the fetching should proceed.
func (b *blockManager) blockHandler() {
defer b.wg.Done()
candidatePeers := list.New()
out:
for {
// Now check peer messages and quit channels.
select {
case m := <-b.peerChan:
switch msg := m.(type) {
case *newPeerMsg:
b.handleNewPeerMsg(candidatePeers, msg.peer)
case *invMsg:
b.handleInvMsg(msg)
case *headersMsg:
b.handleHeadersMsg(msg)
case *donePeerMsg:
b.handleDonePeerMsg(candidatePeers, msg.peer)
default:
log.Warnf("Invalid message type in block "+
"handler: %T", msg)
}
case <-b.quit:
break out
}
}
log.Trace("Block handler done")
}
From the sample code above one can see the various message type that the handler works on. The message sent when a new peer has been added is the newPeerMsg
. This message is handled by the functionhandleNewPeer
message. In this function, the startSync
function is called. The startSync
the function selects the best peer to sync with ( the current algorithm is to select the peer with the highest block height). This selected peer is then assigned as the syncPeer
and a request is sent to it for headers. The peer chosen as the syncPeer
continues on as the syncPeer until the peer disconnects then handleDonePeerMsg
calls the startSync
function, and a new syncPeer
is chosen. This means that only one peer at a time is used to fetch block headers for the neutrino node.
IMPLEMENTING PARALLELIZED HEADER DOWNLOAD.
To fetch headers from a peer, a message known as the GetHeaders
message is sent to the peer. Let us take a closer look at this message.
type MsgGetHeaders struct {
ProtocolVersion uint32
BlockLocatorHashes []*chainhash.Hash
HashStop chainhash.Hash
}
The above is a Golang struct object representing the MsgGetHeaders
message. The protocolVersion is data available in the chain service but let us take a closer look at the last fields which change as the request changes.
BlockLocatorHashes
As can be seen in the struct object, the
BlockLocatorHashes
is a list of block hashes. Supposing a chain is at height 546 and it needs to fetch the next set of headers after block height 546. The BlockLocator would include the hash of block 546 as well as other block hashes of the previous block height down to the Genesis block. Not that it would include all hashes from blocks 0 - 546, but an algorithm is implemented to select the block hashes from blocks 0 -546 that are to be included in theBlockLocatorHashes
.HashStop
This is the block hash of the last block the requesting peer wants to receive.
Leveraging checkpoints we can create a batch of multiple requests and send them to multiple peers for syncing.
Checkpoints: []Checkpoint{
{11111, newHashFromStr("0000000069e244f73d78e8fd29ba2fd2ed618bd6fa2ee92559f542fdb26e7c1d")},
{33333, newHashFromStr("000000002dd5588a74784eaa7ab0507a18ad16a236e7b1ce69f00d7ddfb5d0a6")},
{74000, newHashFromStr("0000000000573993a3c9e41ce34471c079dcf5f52a0e824a81e7f953b8661a20")},
{105000, newHashFromStr("00000000000291ce28027faea320c8d2b054b2e0fe44a773f3eefb151d6bdc97")},
{134444, newHashFromStr("00000000000005b12ffd4cd315cd34ffd4a594f430ac814c91184a0d42d2b0fe")},
{168000, newHashFromStr("000000000000099e61ea72015e79632f216fe6cb33d7899acb35b75c8303b763")},
{193000, newHashFromStr("000000000000059f452a5f7340de6682a977387c17010ff6e6c3bd83ca8b1317")},
{210000, newHashFromStr("000000000000048b95347e83192f69cf0366076336c639f9b7228e9ba171342e")},
{216116, newHashFromStr("00000000000001b4f4b433e81ee46494af945cf96014816a4e2370f11b23df4e")},
{225430, newHashFromStr("00000000000001c108384350f74090433e7fcf79a606b8e797f065b130575932")},
{250000, newHashFromStr("000000000000003887df1f29024b06fc2200b55f8af8f35453d7be294df2d214")},
{267300, newHashFromStr("000000000000000a83fbd660e918f218bf37edd92b748ad940483c7c116179ac")},
{279000, newHashFromStr("0000000000000001ae8c72a0b0c301f67e3afca10e819efa9041e458e9bd7e40")},
{300255, newHashFromStr("0000000000000000162804527c6e9b9f0563a280525f9d08c12041def0a0f3b2")},
{319400, newHashFromStr("000000000000000021c6052e9becade189495d1c539aa37c58917305fd15f13b")},
{343185, newHashFromStr("0000000000000000072b8bf361d01a6ba7d445dd024203fafc78768ed4368554")},
{352940, newHashFromStr("000000000000000010755df42dba556bb72be6a32f3ce0b6941ce4430152c9ff")},
{382320, newHashFromStr("00000000000000000a8dc6ed5b133d0eb2fd6af56203e4159789b092defd8ab2")},
{400000, newHashFromStr("000000000000000004ec466ce4732fe6f1ed1cddc2ed4b328fff5224276e3f6f")},
{430000, newHashFromStr("000000000000000001868b2bb3a285f3cc6b33ea234eb70facf4dcdf22186b87")},
{460000, newHashFromStr("000000000000000000ef751bbce8e744ad303c47ece06c8d863e4d417efc258c")},
{490000, newHashFromStr("000000000000000000de069137b17b8d5a3dfbd5b145b2dcfb203f15d0c4de90")},
{520000, newHashFromStr("0000000000000000000d26984c0229c9f6962dc74db0a6d525f2f1640396f69c")},
{550000, newHashFromStr("000000000000000000223b7a2298fb1c6c75fb0efc28a4c56853ff4112ec6bc9")},
{560000, newHashFromStr("0000000000000000002c7b276daf6efb2b6aa68e2ce3be67ef925b3264ae7122")},
},
The above represents checkpoints of the mainnet Bitcoin network. They are trusted and verified points in the blockchain. As they contain block hashes in their structure, one can construct a GetHeaders
message using the data from checkpoints. Without checkpoints, it would be impossible to create a batch of GetHeader
requests. This is because we can't obtain the block hash of a header that we do not have and this hash is a requirement for the blockLocator
and HashStop
field in the GetHeaders
message. To batch these requests, I created a blockmanager method known as the batchcheckpointedBlkHeaders
function.
func (b *blockManager) batchCheckpointedBlkHeaders() {
fmt.Println("Starting batch checkpointed headers")
var queryMsgs []*headerQuery
curHeight := b.headerTip
curHash := b.headerTipHash
nextCheckpoint := b.nextCheckpoint
nextCheckptHash := nextCheckpoint.Hash
nextCheckptHeight := nextCheckpoint.Height
log.Infof("Fetching set of checkpointed blockheaders from "+
"height=%v, hash=%v\n", curHeight, curHash)
for nextCheckpoint != nil {
endHash := nextCheckptHash
endHeight := nextCheckptHeight
tmpCurHash := curHash
queryMsg := &headerQuery{
Message: &wire.MsgGetHeaders{
BlockLocatorHashes: blockchain.BlockLocator([]*chainhash.Hash{&tmpCurHash}),
HashStop: *endHash,
},
startHeight: int32(curHeight),
initialHeight: int32(curHeight),
startHash: curHash,
endHeight: endHeight,
initialHash: tmpCurHash,
}
log.Infof("Fetching set of checkpointed blockheaders from "+
"start_height=%v to end-height=%v", curHeight, endHash)
queryMsgs = append(queryMsgs, queryMsg)
curHeight = uint32(endHeight)
curHash = *endHash
nextCheckpoint := b.findNextHeaderCheckpoint(int32(curHeight))
if nextCheckpoint == nil {
break
}
nextCheckptHeight = nextCheckpoint.Height
nextCheckptHash = nextCheckpoint.Hash
}
queryMsg := &headerQuery{
Message: &wire.MsgGetHeaders{
BlockLocatorHashes: blockchain.BlockLocator([]*chainhash.Hash{nextCheckptHash}),
HashStop: zeroHash,
},
startHeight: nextCheckptHeight,
initialHeight: nextCheckptHeight,
startHash: *nextCheckptHash,
endHeight: nextCheckptHeight + wire.MaxBlockHeadersPerMsg,
initialHash: *nextCheckptHash,
}
log.Infof("Fetching set of checkpointed blockheaders from "+
"start_height=%v to end-height=%v", curHeight, zeroHash)
queryMsgs = append(queryMsgs, queryMsg)
log.Infof("Attempting to query for %v blockheader batches", len(queryMsgs))
q := CheckpointedBlockHeadersQuery{
blockMgr: b,
msgs: queryMsgs,
}
fmt.Println("sending query f")
errChan := b.cfg.checkptHdrQueryDispatcher.Query(
q.requests(), query.Cancel(b.quit), query.NoTimeout(), query.NoRetryMax(),
query.NoTimeout(), query.KeepBatch(),
)
wm := b.cfg.checkptHdrQueryDispatcher
defer func(wm query.WorkManager) {
err := wm.Stop()
if err != nil {
log.Errorf("Unable to stop block header workmanager: %v", err)
}
}(wm)
b.writeCheckptHeaders(errChan)
}
The batchcheckpointedBlkHeaders
function creates multiple requests and sends the request to the workmanager
to be distributed to the workers. The workmanager
is what is currently used by Neutrino to fetch checkpointed filter headers across peers.
One thing to note is that though, the process of fetching headers can be parallelized, the block headers have to be validated in order. This is because of the process of validating headers:
To validate a header its previous block metadata must be the same as the hash of the previous block which the neutrino node sees as preceding it. For example, if we want to validate block 546, we need to check if the previous block referenced in its header metadata is the same as block 545. Now the node would not be able to carry out this validation if it does not have block 545. My first approach to fixing this was creating multiple header list structure for each checkpoint region in that way since the hash of a block at a checkpoint height is known we can validate subsequent blocks but the second verification that occurs while validating a header put a death knell to that idea.
Proof of work verification is also carried out on the header, the expected difficulty of the block is recalculated at every retarget height. This is the height at which the difficulty of proof of work is adjusted. To make this adjustment, the node needs the timestamp of the block at the previous retarget height. Most times this block height falls beyond the checkpoint range and so each checkpoint region can not autonomously validate its headers.
To address this, I created a goroutine loop that ensures that the blocks are written and verified in order.
To get more details about the implementation check out the pull request.