Inital commit 🤗

This commit is contained in:
Sivert V. Sæther 2023-01-09 11:02:26 +01:00
commit 23ce3ef304
10 changed files with 721 additions and 0 deletions

1
.gitignore vendored Normal file
View File

@ -0,0 +1 @@
/target

18
Cargo.toml Normal file
View File

@ -0,0 +1,18 @@
[package]
name = "blues"
version = "0.1.0"
edition = "2021"
license = "MIT"
[dependencies]
nbd = { git = "https://github.com/tchajed/rust-nbd" }
internet-checksum = "0.2.1"
serde_json = "1.0.91"
env_logger = "0.10.0"
num_cpus = "1.15.0"
serde = { version = "1.0.152", features = ["derive"] }
tokio = { version = "1.24.1", features = ["full"] }
clap = { version = "4.0.32", features = ["derive"] }
icmp = "0.3.0"
rand = "0.8.5"
log = "0.4.17"

26
Makefile Normal file
View File

@ -0,0 +1,26 @@
export RUST_LOG = trace
CARGO := cargo
SU := doas
.PHONY: run debug release setcap
run: debug setcap
target/debug/blues -t 1 recon -ro 3000 -p 1 -t 150 -l 1
nbd: debug setcap
target/debug/blues nbd
debug:
$(CARGO) build
release:
$(CARGO) build --release
masscan: release setcap
target/release/blues -t 48 recon -ro 7000 -p 48 -t 350 -l 420 # 69
setcap:
test -d target/release && $(SU) setcap cap_net_raw+ep target/release/blues
test -d target/debug && $(SU) setcap cap_net_raw+ep target/debug/blues

3
TODO.md Normal file
View File

@ -0,0 +1,3 @@
# TODO
* Fuuuck ICMP -> no ports -> gotta do de sequence and idents
* Load only non-small round trip sorted ips for storage

110
src/bin/blues.rs Normal file
View File

@ -0,0 +1,110 @@
use std::{
io, net::IpAddr,
time::Duration,
thread::sleep};
use blues::{TIMEOUT, PingStore, IPStore, Scanner, get_rt};
use clap::{builder::ArgAction, Subcommand, Parser};
use nbd::server::Blocks;
use log::{trace, debug, info, error};
/// Blues "cloud" storage engine
#[derive(Parser, Debug)]
#[command(author, version, about, long_about = None)]
struct Args {
/// Max worker threads to use for multi threaded operations
#[arg(short, long, value_parser, default_value_t = num_cpus::get())]
threads: usize,
/// Save file to store ping destinations
#[arg(short, long, value_parser, default_value = "ips.json")]
file: String,
/// Subcommands
#[command(subcommand)]
sub: Command,
}
/// Subcommands (enum)
#[derive(Subcommand, Debug)]
#[command()]
enum Command {
/// Scan the internet to find optimal destinations
Recon {
/// Please be a good netizen, how many ms beetween ping bursts
#[arg(short, long, value_parser, default_value_t = 150)]
throttle: usize,
/// How many parallel outstanding pings to have during scanning
#[arg(short, long, value_parser, default_value_t = 420)]
parallel: usize,
/// Ping timeout while scanning in ms
#[arg(short = 'o', long, value_parser, default_value_t = 7000)]
timeout: u64,
/// Scan until limit is reached
#[arg(short, long, value_parser, default_value_t = 0)]
limit: usize,
/// Random order of IPs
#[arg(short, long, action = ArgAction::SetTrue)]
rand: bool,
},
/// NBD server and client operations
NBD {
/// NBD device path
#[arg(short, long, value_parser, default_value = "/dev/nbd0")]
device: String,
}
}
fn main() -> io::Result<()> {
let args = Args::parse();
env_logger::Builder::from_env(
env_logger::Env::default().default_filter_or("info")
).init();
info!("Starting blues");
trace!("With args: {:#?}", args);
match args.sub {
Command::Recon { throttle, parallel, timeout, limit, rand } => {
debug!("Mode is Scan/Recon");
unsafe { TIMEOUT = Some(Duration::from_millis(timeout)) };
let mut scanner = Scanner::load(&args.file);
get_rt("blues Scanner worker", args.threads).block_on(
scanner.mass_scan(throttle, parallel, limit, rand));
info!("Saving to file: {}", args.file);
IPStore::from_scanner(&scanner).save(&args.file);
},
Command::NBD { device } => {
debug!("Mode is NBD");
let store = PingStore::load_clients(&args.file);
let mut data = vec![
0x49, 0x43, 0x4d, 0x50, 0x20, 0x62, 0x61, 0x6c,
0x6c, 0x65, 0x20, 0x6e, 0x65, 0x67, 0x65, 0x72];
// get_rt("blues NBD driver", args.threads).spawn(store.run());
data.append(&mut [0x66; 64].to_vec());
// data.append(&mut [0x66; 48].to_vec());
info!("Storing some data in a few pings!");
store.write_at(&data, 0)?;
// info!("Waiting for data corruption!");
// sleep(Duration::from_millis(3000));
let mut res: [u8; 64 + 16] = [0; 64 + 16];
// let mut res: [u8; 64] = [0; 64];
store.read_at(&mut res, 0)?;
if res.to_vec() == data {
info!("SUCCESS! Recived same data as stored!");
} else {
error!("Didn't recive the same data as stored!\nExpected: {data:?}\nGot: {res:?}");
}
}
}
Ok(())
}

169
src/blocks.rs Normal file
View File

@ -0,0 +1,169 @@
use std::{
sync::{Mutex, Arc},
time::Duration,
thread::sleep,
net::IpAddr,
vec::Vec, io};
use log::{trace, warn, error};
use tokio::{
sync::mpsc,
runtime, task};
use nbd::server::Blocks;
use icmp::IcmpSocket;
use crate::{
ICMP_PACKET, IPStore,
pinger::connect,
checksum,
get_rt};
const SIZE: usize = 64; // Bits of data in each ping
static WORD_COUNT: usize = SIZE / 8; // Number of 16-bit words in each ping
type Message = (usize, usize, Vec<u8>); // Message type for the write channel
pub struct Ping {
socks: Vec<IcmpSocket>,
ips: Vec<IpAddr>,
copies: usize,
}
pub struct PingStore {
pings: Arc<Mutex<Vec<Ping>>>,
size: u64,
}
impl PingStore {
pub fn new() -> Self {
Self {
pings: Arc::new(Mutex::new(vec![])),
size: 0,
}
}
pub fn load_clients(file: &str) -> Self {
let ips = IPStore::load(file);
let mut store = Self::new();
/*
let mut dstmap: Vec<(usize, IpAddr)> = vec![];
for dst in ips.dsts {
dstmap.push((dst.round_trip, dst.ip));
}
let sorted = dstmap.sort_by(|a, b| a.0.cmp(&b.0));
trace!("{:?}", sorted);
*/
for _ in 0..ips.dsts.len().div_floor(7) {
let offset = store.pings.lock().unwrap().len() * 7;
let mut ping = Ping::new();
for j in 0..7 {
ping.add(ips.dsts[offset + j].ip);
}
store.pings.lock().unwrap().push(ping);
store.size += 1;
}
store
}
fn read(&self, addr: usize) -> io::Result<Vec<u8>> {
trace!("Reading addr 0x{addr:x}");
let mut datas: Vec<Vec<u8>> = vec![];
let ping = &self.pings.lock().unwrap()[addr];
for i in 0..ping.copies {
let mut data: [u8; 28 + SIZE] = [0; 28 + SIZE];
ping.socks[i].recv(&mut data)?;
datas.push(data[28..(28 + SIZE)].to_vec());
}
let first: [u8; SIZE] = datas[0].clone()[..].try_into().unwrap();
let mut good: [u8; SIZE] = datas[0].clone()[..].try_into().unwrap();
for data in datas[1..].iter() {
if *data != good && *data == first {
good = first;
}
}
for i in 0..datas.len() {
if good != *datas[i] {
warn!("Sus response data in ping from \"{}\"", ping.ips[i]);
}
}
Ok(good.to_vec())
}
async fn write(&self, buf: &[u8], addr: usize, off: usize, size: usize) -> io::Result<()> {
let mut packets = vec![];
for i in addr..(size.div_ceil(SIZE)) {
let loc = i * SIZE;
packets.push(buf[loc..loc+SIZE].to_vec());
}
for (pingspan, packet) in packets.iter().enumerate() {
let offset = addr + pingspan;
trace!("Writing addr 0x{offset:x}");
self.ping(offset, packet)?;
}
Ok(())
}
fn ping(&self, addr: usize, data: &[u8]) -> io::Result<()> {
trace!("Sending store ping with addr 0x{addr:x}");
let mut packet = ICMP_PACKET.to_vec();
packet.append(&mut data.to_vec());
let checksum = checksum(&packet);
packet[2] = checksum[0];
packet[3] = checksum[1];
packet[6] = 0;
packet[7] = 1;
let ping = &mut self.pings.lock().unwrap()[addr];
let mut res = vec![];
for i in 0..ping.copies {
res.push(ping.socks[i].send(&packet)?);
}
Ok(())
}
}
impl Blocks for PingStore {
fn read_at(&self, buf: &mut [u8], off: u64) -> io::Result<()> {
let addr = (off as usize).div_floor(WORD_COUNT);
let buflen = buf.len();
let mut res = vec![];
for pingspan in 0..buflen.div_ceil(SIZE) {
res.append(&mut self.read(addr + pingspan)?.to_vec());
}
//buf.copy_from_slice(&res[0..((addr * 8) - off as usize)]);
buf.copy_from_slice(&res[0..buflen]);
Ok(())
}
fn write_at(&self, buf: &[u8], off: u64) -> io::Result<()> {
let addr = (off as usize).div_ceil(WORD_COUNT);
let buflen = buf.len();
get_rt("channel pusher", num_cpus::get().div_floor(3) + 1).block_on(
self.write(buf, addr, (addr * WORD_COUNT) - off as usize, buflen))?;
Ok(())
}
fn size(&self) -> io::Result<u64> {
Ok((self.pings.lock().unwrap().len() * 56) as u64)
}
fn flush(&self) -> io::Result<()> {
Ok(())
}
}
impl Ping {
fn new() -> Self {
Self {
socks: vec![],
ips: vec![],
copies: 0,
}
}
fn add(&mut self, ip: IpAddr) {
self.socks.push(connect(ip));
self.ips.push(ip);
self.copies += 1;
}
}

33
src/lib.rs Normal file
View File

@ -0,0 +1,33 @@
#![feature(bigint_helper_methods)]
#![feature(async_closure)]
#![feature(int_roundings)]
#![feature(never_type)]
#![feature(ip)]
pub mod scanner;
pub mod pinger;
pub mod blocks;
pub mod store;
pub use blocks::PingStore;
pub use scanner::Scanner;
pub use pinger::{
ICMP_PACKET, TIMEOUT, Pinger};
pub use store::IPStore;
#[inline]
pub fn checksum(bytes: &[u8]) -> [u8; 2] {
let mut calc = internet_checksum::Checksum::new();
calc.add_bytes(bytes);
calc.checksum()
}
#[inline]
pub fn get_rt(name: &str, threads: usize) -> tokio::runtime::Runtime {
match tokio::runtime::Builder::new_multi_thread()
.worker_threads(threads)
.thread_name(name)
.build() {
Err(err) => panic!("Unable to build tokio runtime! {:?}", err),
Ok(rt) => rt,
}
}

164
src/pinger.rs Normal file
View File

@ -0,0 +1,164 @@
use std::{
time::{Duration, Instant},
net::{Ipv4Addr, IpAddr},
io};
use log::{trace, debug, info};
use icmp::IcmpSocket;
use rand::random;
use crate::checksum;
pub static mut TIMEOUT: Option<Duration> = None;
pub static ICMP_PACKET: [u8; 8] = [
8, 0, // 8 Echo ping request, code 0
0, 0, // Index 2 and 3 are for ICMP checksum
0xde, 0xad, // Identifier
0, 1, // Sequence numbers
];
pub struct PingResponse {
pub round_trip: usize,
pub corrupt: bool,
pub small: bool,
pub data: [u8; 84],
}
pub type PingResult = Result<PingResponse, io::Error>;
pub struct Pinger {
pub data: [u8; 64],
pub seq: [u8; 2],
}
impl Pinger {
pub fn new() -> Self {
Self {
data: [0; 64],
seq: [1, 0],
}
}
pub fn data(mut self, data: [u8; 64]) -> Self {
self.data = data;
self
}
pub fn ping(&mut self, ip: IpAddr) -> PingResult {
let mut pack = ICMP_PACKET.to_vec();
pack[6] = self.seq[1];
pack[7] = self.seq[0];
pack.append(&mut self.data.clone().to_vec());
let checksum = checksum(&pack);
pack[2] = checksum[0];
pack[3] = checksum[1];
let mut socket = connect(ip);
let start = Instant::now();
match socket.send(&pack) {
Err(err) => {
debug!("Was unable to ping IP {}: {:?}", ip, err);
return Err(err);
},
Ok(count) => if count != pack.len() {
debug!("Sent {} of {} bytes to {}", count, pack.len(), ip);
},
};
self.inc_seq();
let mut response: [u8; 64 + 20] = [0; 64 + 20];
match socket.recv(&mut response) {
Err(err) => {
info!("Error scanning IP \"{}\": {:?}", ip, err);
Err(err)
}
Ok(size) => {
let mut result = PingResponse::new(response, start.elapsed().as_millis() as usize);
if size != 84 {
debug!("Didn't revice full response from \"{}\"", ip);
trace!("Response in question: {:?}", response);
result.corrupt = true;
}
if response[20 + 8..] != self.data {
if response[56..] == self.data[..28] {
result.small = true
} else {
debug!("Response payload was not the same as request in ping to \"{}\"", ip);
trace!("Response in question: {:?}", response);
result.corrupt = true;
}
}
Ok(result)
}
}
}
fn inc_seq(&mut self) {
let mut carry = false;
(self.seq[0], carry) = self.seq[0].carrying_add(1, carry);
if carry {
carry = false;
(self.seq[1], carry) = self.seq[1].carrying_add(1, carry);
}
if carry {
self.seq = [1, 0];
}
}
}
pub fn connect(ip: IpAddr) -> IcmpSocket {
let sock = match IcmpSocket::connect(ip) {
Err(err) => panic!("Unable to open ICMP socket: {err:?}"),
Ok(sock) => sock,
};
let timeout = unsafe { TIMEOUT };
if timeout.is_none() {
return sock;
}
if let Err(err) = sock.set_write_timeout(timeout) {
debug!("unable to set write timeout on socket: {}", err);
};
if let Err(err) = sock.set_read_timeout(timeout) {
debug!("unable to set read timeout on socket: {}", err);
};
sock
}
#[inline]
fn rand_ip4() -> Ipv4Addr {
Ipv4Addr::new(random(), random(), random(), random())
}
#[inline]
pub fn rand_ip() -> IpAddr {
let mut ip = rand_ip4();
while !ip.is_global() { ip = rand_ip4() };
IpAddr::V4(ip)
}
impl PingResponse {
fn new(data: [u8; 84], round_trip: usize) -> Self {
Self {
round_trip,
corrupt: false,
small: false,
data,
}
}
}
impl Default for PingResponse {
fn default() -> Self {
Self::new([0; 84], 0)
}
}
impl Default for Pinger {
fn default() -> Self {
Self::new()
}
}

110
src/scanner.rs Normal file
View File

@ -0,0 +1,110 @@
use std::{
vec::Vec, net::{Ipv4Addr, IpAddr},
time::Duration, thread::sleep};
use serde::{Deserialize, Serialize};
use log::{trace, debug, info, error};
use tokio::task;
use crate::{
pinger::rand_ip,
IPStore, Pinger};
//static PROBE_CHK: [u8; 2] = [0x51, 0x59];
static PROBE: [u8; 64] = [0x66; 64];
#[derive(Clone)]
pub struct Scanner {
pub dsts: Vec<Destination>,
pub dead: Vec<IpAddr>,
}
#[derive(Deserialize, Serialize, Clone)]
pub struct Destination {
pub round_trip: usize, // Round trip in ms
pub small: bool,
pub ip: IpAddr,
}
pub type ScanResult = Result<Destination, IpAddr>;
impl Scanner {
pub fn new() -> Self {
debug!("Initializing a blues::Scanner");
Self { dsts: vec![], dead: vec![] }
}
pub fn load(file: &str) -> Self {
let mut scanner = Self::new();
let store = IPStore::load(file);
scanner.dsts = store.dsts;
scanner.dead = store.dead;
scanner
}
#[inline]
fn next_ip(&self) -> IpAddr {
// let mut ip = rand_ip();
let mut ip = IpAddr::V4(Ipv4Addr::new(10, 0, 0, 138));
while self.scanned(&ip) {
ip = rand_ip(); }
ip
}
#[inline]
fn scanned(&self, ip: &IpAddr) -> bool {
for dead in &self.dead {
if ip == dead { return true; }
}
for dst in &self.dsts {
if ip == &dst.ip {
return true;
}
}
false
}
async fn pop_futs(&mut self, futs: &mut Vec<task::JoinHandle<ScanResult>>) {
let fut = futs.pop().expect("Unable to pop of futures vec during mass_scan()!");
match fut.await {
Err(err) => {
error!("Joing future: {:?}", err);
},
Ok(res) => match res {
Ok(dst) => self.dsts.push(dst),
Err(ip) => self.dead.push(ip),
}
}
}
pub async fn mass_scan(&mut self, throttle: usize, parallel: usize, limit: usize, rand: bool) {
info!("Starting mass scan with a limit of {} and {} to randomized IP order!", limit, rand);
let duration = Duration::from_millis(throttle as u64);
let mut futs: Vec<task::JoinHandle<ScanResult>> = vec![];
for _i in 0..limit {
if futs.len() >= parallel {
self.pop_futs(&mut futs).await;
}
futs.push(task::spawn(scan(self.next_ip())));
sleep(duration);
}
while !futs.is_empty() {
self.pop_futs(&mut futs).await }
}
}
pub async fn scan(ip: IpAddr) -> Result<Destination, IpAddr> {
trace!("Scanning IP \"{}\"", ip);
if let Ok(res) = Pinger::new().data(PROBE).ping(ip) {
info!("Good result scanning IP \"{}\": {}ms", ip, res.round_trip);
if res.corrupt { return Err(ip) }
Ok(Destination { ip, small: res.small, round_trip: res.round_trip })
} else { Err(ip) }
}
impl Default for Scanner {
fn default() -> Self {
Self::new()
}
}

87
src/store.rs Normal file
View File

@ -0,0 +1,87 @@
use std::{
fs::OpenOptions,
io::{Write, Read},
str::from_utf8,
net::IpAddr,
vec::Vec};
use log::{trace, debug, error};
use serde::{Deserialize, Serialize};
use crate::{
scanner::Destination,
Scanner};
#[derive(Deserialize, Serialize)]
pub struct IPStore {
pub dsts: Vec<Destination>,
pub dead: Vec<IpAddr>,
}
impl IPStore {
pub fn new() -> Self {
Self { dsts: vec![], dead: vec![] }
}
pub fn from_scanner(scanner: &Scanner) -> Self {
// Self { dsts: scanner.dsts.clone(), dead: scanner.dead.clone() }
Self { dsts: scanner.dsts.clone(), dead: vec![] }
}
pub fn save(&self, file_name: &str) {
let mut file = OpenOptions::new()
.write(true).create(true)
.open(file_name).unwrap();
match serde_json::to_string(self) {
Err(err) => {
error!("Serializing IPStore for saving to file: {:?}", err);
},
Ok(out) => {
match file.write_all(out.as_bytes()) {
Err(err) => error!(
"Saving IPStore to file \"{}\": {:?}",
file_name, err),
Ok(ok) => ok
};
},
};
}
pub fn load(file_name: &str) -> Self {
let mut input: Vec<u8> = vec![];
match OpenOptions::new().read(true).open(file_name) {
Err(err) => {
debug!(
"Opening IPStore save file \"{}\": {:?}",
file_name, err);
return IPStore::new();
},
Ok(mut file) => match file.read_to_end(&mut input) {
Err(err) => {
error!(
"Reading IPStore save file \"{}\": {:?}",
file_name, err);
return IPStore::new();
},
Ok(len) => trace!("Read {} bytes from IPStore save file", len),
}
}
let data = from_utf8(&input).unwrap();
match serde_json::from_str(data) {
Err(err) => {
error!("Loading IPStore: {:?}", err);
IPStore::new()
},
Ok(store) => store,
}
}
}
impl Default for IPStore {
fn default() -> Self {
Self::new()
}
}