In this post, we will talk about how to implement a Rust workflow that can process a large number of BGP data files as fast as we can. We will use BGPKIT Parser and Broker for data collection and parsing, and Rayon crate for parallelization of the code.
Task Overview
Before we begin to talk about the code design, we first need to introduce the data we are dealing with. We want to process the BGP data collected by various collectors, saved in compressed binary MRT format, and archived to files with a fixed interval. In this post, we are using RouteViews archive data as an example. The average data file size ranges from 2MB to 10MB by different collectors (AMSIX collector for example has pretty large files).
For processing, we can use the simplest task possible to do: sum the number of MRT records in all the files. We want to download and process all the updates files for one hour from all the collectors in RouteViews project. Here is the estimated amount of data we are dealing with:
35 collectors
5-minute interval — 12 files per collector
420 total number of files to download and process
840MB to 4.2GB total download size (it’s somewhere in between)
Ok! Now that we know what we want to do and have a sense of the estimated workload of the overall task, let’s coding!
Photo by Glenn Carstens-Peters on Unsplash
1. Sequential Parsing
Our first attempt to achieve the goal is to design and implement a naive sequential workflow as described below
find all BGP updates files within the hour of interest
iterate through each file, parse the MRT data and count the number of records
sum all record counts and print out the result
For this sequential workflow, we will need to pull in two dependencies into Cargo.toml
:
[dependencies]
bgpkit-parser = "0.7.2"
bgpkit-broker = "0.3.2"
The bgpkit-broker
handles looking for updates files within the hour, while bgpkit-parser
handles parsing each individual file.
Finding files
BGPKIT Broker indexes all available BGP MRT data archive files from both RouteViews and RIPE RIS in close-to-real-time. For each data file, it saves the following information:
project
:route-views
orriperis
collector
: the collector ID, e.g.rrc00
orroute-views2
url
: the URL to the corresponding MRT filetimestamp
: the UNIX time of the start time of the MRT data file.
With all this information indexed, we can then query the backend and retrieve files information as we want. BGPKIT Broker provides both RESTful API, Rust API, as well as a Python API. Here we use the Rust API to pull in the information we need:
let broker = BgpkitBroker::new_with_params(
"https://api.broker.bgpkit.com/v1",
QueryParams {
start_ts: Some(1640995200),
end_ts: Some(1640998799),
project: Some("route-views".to_string()),
data_type: Some("update".to_string()),
..Default::default()
});
for item in &broker {
println!("processing {:?}...", &item);
}
The above block queries the broker and prints out all information of the retrieved files' metadata. The BgpkitBroker::new_with_params
call accepts two parameters, one for the endpoint of the broker instance, and the other specifies the filtering criteria. In this example, we search for all BGP updates files from RouteViews with timestamps between 2022-01-01T00:00:00
and 2022-01-01T00:59:59
UTC. It prints out the output as the following:
processing BrokerItem { collector_id: "route-views.telxatl", timestamp: 1640997000, data_type: "update", url: "http://archive.routeviews.org/route-views.telxatl/bgpdata/2022.01/UPDATES/updates.20220101.0030.bz2" }...
processing BrokerItem { collector_id: "route-views.uaeix", timestamp: 1640997000, data_type: "update", url: "http://archive.routeviews.org/route-views.uaeix/bgpdata/2022.01/UPDATES/updates.20220101.0030.bz2" }...
processing BrokerItem { collector_id: "route-views.wide", timestamp: 1640997000, data_type: "update", url: "http://archive.routeviews.org/route-views.wide/bgpdata/2022.01/UPDATES/updates.20220101.0030.bz2" }...
processing BrokerItem { collector_id: "route-views2", timestamp: 1640997900, data_type: "update", url: "http://archive.routeviews.org/bgpdata/2022.01/UPDATES/updates.20220101.0045.bz2" }...
Parse each MRT file
Previously, in the for loop, we only print out the retrieved meta information of the MRT files. Now let's add the actual parsing of the files into the loop. The code is very simple, as designed:
let mut sum: usize = 0;
for item in &broker {
println!("processing {}...", &item.url);
let parser = BgpkitParser::new(&item.url).unwrap();
let count = parser.into_record_iter().count();
sum += count;
}
We first define a mutable variable sum
outside the loop. Then for each file, we create a new Parser instance by BgpkitParser::_new_(&item.url)
. Here, as our goal is to count the number of records, we call the parser's .into_record_iter()
function to create a iterator over the records of the file, and then .count()
to get the count of the records. Lastly, we add the count to the overall sum variable.
Run and timing
For testing, I use a fairly powerful VM on a host with AMD 3950x CPU (32 threads), then build the release build and time the release run. The runtime includes downloading the MRT files to my machine with 1Gpbs down link in Southern California.
cargo build --release
time cargo run --release --bin sequential
It ended up taking about 1 minute and 23 seconds to sequentially parse 144 MRT files from RouteViews for all available ones within the first hour of 2022 (UTC).
total number of records for 144 files is 10554212
real 1m23.081s
user 0m39.535s
sys 0m1.006s
2. Parallel Parsing
Since the parsing of each file is completely independent of each other, we can parse the files in parallel and then sum up the count for each thread at the end. In Rust with Rayon, this conversion is very simple.
Let's first add the dependency of Rayon first:
[dependencies]
bgpkit-parser = "0.7.2"
bgpkit-broker = "0.3.2"
rayon = "1.5.1"
Then we change the broker code one tiny bit to collect all meta information for files into a vector first.
let items = broker.into_iter().collect::<Vec<BrokerItem>>();
This would enable us to fully utilize rayon
's great syntax sugar to turn our sequential code into a parallel one.
let sum: usize = items.par_iter().map(|item| {
println!("processing {}...", &item.url);
let parser = BgpkitParser::new(&item.url).unwrap();
let count = parser.into_record_iter().count();
count
}).sum();
The key difference here is the calling of .par_iter()
. It turns a sequential iterator into a parallel iterator, and by default going to utilize all available cores on the host machine for scheduling. Then we call .map()
to define the parsing steps for each file, and then call .sum()
at the end to add all results up.
The final result is approximately 10x faster than the sequential version, and it took only 8 seconds to parse all MRT files and get the record counts.
total number of records is 10554212
real 0m8.086s
user 0m42.569s
sys 0m1.068s
The full code for this example is as follows:
The source code of the two examples is available on GitHub. Feel free to poke around and tweak it as you wish.
https://github.com/bgpkit/bgpkit-tutorials/tree/main/parallel-parsing