Experiment with streams, reading files with futures

Since our current hacksession (Vienna, metalab usually Tuesdays ~ 18:30) still remains in the concurrency realm. the last two weeks were about futures and the runtime implementations.

We were experimenting with streams and I wanted to play around with them as well. There are some tokio implementations for async file reading futuers, but since linux filesystems before kernel 5.1 do not really support non blocking file operations

I thought lets have fun breaking things ourselves.

As I mentioned this is not really non blocking I/O esp. since there are two ways to view futures in their current state.

One is to just see them as an eventloop that dispatches our futures to our underlying runtime and executes them. They don't really need a handler because their polling mechanism basically always defaults as "okay lets do this"

Async::Ready(T);

However more complex futures that are listening on non-blocking sockets / handlers need their wake handler callback. Since my kernel does not support async filereads, this is basically a blocking stream of futures returning one line per iterations.

It is more like a generator yield coroutine implemented as a stream.

In theory the same can be done in a generator / io-monad-like-structur, or just in a foreach loop that shares an income channel via multiple threads - this version would have the problem of ordering and others... anyhow.

From a generator / monad perspective: we would put in file buffers and we get out strings. This is the same what we do with our Streams, the underlying principles can be seen as different but to some degree those concepts are isomorphic.

so this is our toml file dependencies:

[dependencies]
futures = "0.1"
tokio = "0.1.21"

I will go through the parts and why I used them. A lot of decisions are PoC decisions and not analytic thought through choices.

extern crate futures;
extern crate tokio;

use tokio::prelude::*;
use std::fs::File;
use std::io::{BufReader, BufRead};
use core::borrow::{BorrowMut};

struct FileStream {
    buffered_reader: BufReader<File>,
}

impl Stream for FileStream {
    type Item = String;
    type Error = String;

    fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {

        let mut line = String::new();
        let bytes = self.buffered_reader.borrow_mut().read_line(&mut line);

        if bytes.is_err() {
            println!("{}", "omg we're haven an error");
            return Ok(Async::Ready(None));
        }

        if bytes.unwrap() == 0 {
            println!("{}", "omg we're at the end");
            return Ok(Async::Ready(None));
        }

        Ok(Async::Ready(Some(line)))
    }
}


fn main() {
    let file = BufReader::new(File::open("./test-data/source-text1").unwrap());
    let file2 = BufReader::new(File::open("./test-data/source-text2").unwrap());

    let file_stream = FileStream {
        buffered_reader: file,
    }.map_err(|_| {});

    let file_stream2 = FileStream {
        buffered_reader: file2,
    }.map_err(|_| {});



    tokio::run( file_stream
        .select(file_stream2)
        .for_each(|text| {
            print!("{}", text); return Ok(())
        }));
}

Tokio is one runtime for futures, it's the current 'default' that's why I use it here. I know tokio implements async file operations as mentioned above, but where is the fun in that?

to my code.

struct FileStream {
    buffered_reader: BufReader<File>,
}

My buffered reader BufReader<File> is just a choice of convenience. I wanted to be able to read per line and that the cursor position is persisted. Since the focus was on the Future/Stream I did not want to invest into implementing this in a different way.

Next is the Stream implementation for our FileStream struct

impl Stream for FileStream {
    type Item = String;
    type Error = String;

    fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {

        let mut line = String::new();
        let bytes = self.buffered_reader.borrow_mut().read_line(&mut line);

        if bytes.is_err() {
            println!("{}", "omg we're haven an error");
            return Ok(Async::Ready(None));
        }

        if bytes.unwrap() == 0 {
            println!("{}", "omg we're at the end");
            return Ok(Async::Ready(None));
        }

        Ok(Async::Ready(Some(line)))
    }
}

Lets look at this common pattern that you have

 type Item = String;
 type Error = String;

a defined LHS and RHS (left hand side and right hand side) in our case for the Result<Async<self::Item>, self::Error>

this in theory would allow you to write a generic implementations also it allows you to specify certain trait boundaries for the types in a more structured manner.

Next every future or stream needs a poll method.

    fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {
    }

Poll is just a type alias to reduce the amount of code to be written.

pub type Poll<T, E> = Result<Async<T>, E>;

we could also just write

fn poll(&mut self) -> Result<Async<Option<Self::Item>>, Self::Error> {
}

so if you got long repeating type chains, don't forget to alias them. You can add some semantic context / make the type chain more concise.

Interesting fun fact. One difference between the future poll and the stream poll is that streams always return Options.

The point is that the

Ok(Async::Ready(None));

command terminates the stream.

Because there are more than two states for the future polling.

A future can in some respect seen as a fnOnce that will return a result exactly once. So for the basic functionality you just need two states within the poll.

// future got a result
Async::Ready(T);

// still waiting (tells the runtime to park the future)
Async::NotReady;

As i mentioned in this case I will use the future as eventloop so I don't care much about the internals of the kernel or how the system behaves besides it returns one line per poll call.

Poll method:

The poll method is the indicator for our system if a future is ready to return / execute something.

The problem with busy-waiting is solved via wake call. But the whole 'how does a future work for real non blocking / async IO' is a blogpost on it's own.

Important is that the poll method is the thing called by the runtime, so it returns the result to our 'consuming behavior'

Since futures are closures pushed to a thread and the incoming parameters are on the stack it has no return value! This is why the last returning future - if you use future in future - needs to return void.

That's because he thread keeps on running and where should it 'return the value too' ? However as soon as you join or use other operations a temporary stack can be built that passes the values to the next future. (This is pure speculation and I need to verify it)

What does this mean? You have to work on the heap or with other side-effects in futures. For those who find side-effects a fuzzy term. It just means outside of your function body, like writing something on the screen, as a file and such things.

Back to the code:

fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> {

        let mut line = String::new();
        let bytes = self.buffered_reader.borrow_mut().read_line(&mut line);

        if bytes.is_err() {
            println!("{}", "omg we're haven an error");
            return Ok(Async::Ready(None));
        }

        if bytes.unwrap() == 0 {
            println!("{}", "omg we're at the end");
            return Ok(Async::Ready(None));
        }

        Ok(Async::Ready(Some(line)))
    }

This code is semi straight forward. I create a new string where I will get the values of the linebuffer copied and transformed to a string.

The buffer internal is a Vec<u8> so a vector that contains binary values that will be transformed and pushed onto the String which is a vector of <u8> which is looked at / grouped by a defined encoding. UTF-8 in our case.

UTF-8 is a variable length encoding with a minimum of 8 bits per character. Characters with higher code points will take up to 32 bits. Quote from Wikipedia: "UTF-8 encodes each of the 1,112,064 code points in the Unicode character set using one to four 8-bit bytes (termed "octets" in the Unicode Standard).

stackoverflow

We're getting a line and writing it to our local string. That's why the string needs to be mutable and be passed as mutable reference. Also we need to borrow our reader mutable otherwise it cannot store the current cursor position within the buffered reader

let mut line = String::new();
let bytes = self.buffered_reader.borrow_mut().read_line(&mut line);

Next are our security checks:

The first one is to check if there is major problem it will terminate the stream in our case.

if bytes.is_err() {
    println!("{}", "omg we're haven an error");
    return Ok(Async::Ready(None));
}

The second check is if the file is at it's end.

if bytes.unwrap() == 0 {
    println!("{}", "omg we're at the end");
    return Ok(Async::Ready(None));
}

And in the end we just return the line to our consumers

Ok(Async::Ready(Some(line)))

To the main function

fn main() {
    let file = BufReader::new(File::open("./test-data/source-text1").unwrap());
    let file2 = BufReader::new(File::open("./test-data/source-text2").unwrap());

    let file_stream = FileStream {
        buffered_reader: file,
    }.map_err(|_| {});

    let file_stream2 = FileStream {
        buffered_reader: file2,
    }.map_err(|_| {});

    tokio::run( file_stream
        .select(file_stream2)
        .for_each(|text| {
            print!("{}", text); return Ok(())
        }));
}

We create 2 different file streams

 let file = BufReader::new(File::open("./test-data/source-text1").unwrap());
 let file2 = BufReader::new(File::open("./test-data/source-text2").unwrap());

 let file_stream = FileStream {
        buffered_reader: file,
    }.map_err(|_| {});

    let file_stream2 = FileStream {
        buffered_reader: file2,
    }.map_err(|_| {});

We just tell it to ignore the errors ;D ... in this case this was a PoC implementations for myself.

---- it seems the draft and the countryside internet deleted the end of my blogpost --- yay ... もっと ... もっと .... uno mas ... encore ...

We pass it to our run function which spawns our runtime and registers the future / stream to be executed.

tokio::run( ... )

We combine our two streams to run one after the other and flip flop between the streams. Select does this for us if you look at the implementations details basically it returns a stream that returns either the file_stream poll result or the file_stream2 poll result and switches a flag that indicates which stream is next.

file_stream
        .select(file_stream2)

We consume it with our for_each

.for_each(|text| {
            print!("{}", text); 
            return Ok(())
        });

Basically that's it :)

This is just so to reiterate over the problem. Maybe someone find's it useful :) feedback is as always welcome :)

Comments (2)

Add a comment
Mark's photo

I missed where the Err being made so I'm not sure... This type

Result<Async<self::Item>, self::Error>

looks peculiar. Wouldn't Async be the outer one? Usually when things go wrong, you only find out when resuming the awaited code, right?

j's photo

stuff ;)

Mark gg this is not the error you're looking for ;D .... you're complete right in this case the error should be void ().

I decided to not really have an error, this is obviously a possible endless loop logic bug. if the buffered stream return an error it will always just say 'nahhh just wait another iteration' :).

What can I say I am a fan of buzz lightyear ... to infinity and beyond. No seriously thank you for pointing that out :D I will add the why.