Rustコトハジメ

Rustで競技プログラミングをやる人の覚書のようなものです。

tonicのロードバランサを調査する

tonic 0.3.1

おれが作っているRaftライブラリlol (https://github.com/akiradeveloper/lol) の参考のため、tonicのロードバランシングについて調査する。Raftでは、クライアントがクラスタ内のノードのどれかにアクセス出来ればよいため、似たような機能が必要ではあるが、リーダーに直接アクセスするのが最善であり、ほしいものは厳密にはロードバランサではない。しかし、参考にはなる。

Channel::balance_channelは、ChannelEndpointのリストを動的に管理して、それに対して負荷が均一になるようにリクエストを送るものである。balance_listというのもあるが、これは静的。

    pub fn balance_channel<K>(capacity: usize) -> (Self, Sender<Change<K, Endpoint>>)
    where
        K: Hash + Eq + Send + Clone + 'static,
    {
        let (tx, rx) = channel(capacity);
        let list = DynamicServiceStream::new(rx);
        (Self::balance(list, DEFAULT_BUFFER_SIZE), tx)
    }

返り値は、(Channel, tx)であり、

ロードバランサーつきのChannel -> DynamicServiceStream (: Discover) -> rx という依存関係になる。このrxにEndpointリストの差分を送ると、あとは適当にやってくれる。

/// Distributes requests across inner services using the [Power of Two Choices][p2c].
///
/// As described in the [Finagle Guide][finagle]:
///
/// > The algorithm randomly picks two services from the set of ready endpoints and
/// > selects the least loaded of the two. By repeatedly using this strategy, we can
/// > expect a manageable upper bound on the maximum load of any server.

適当にというのは、p2cというアルゴリズムによってロードバランシングするということである。finagleというのはTwitterが作っているサーバフレームワークだったような気がする。というかふと思い出したが、おれはfinagle/finch貢献している。理由は忘れた。

impl<K: Hash + Eq + Clone> Discover for DynamicServiceStream<K> {
    type Key = K;
    type Service = Connection;
    type Error = crate::Error;

    fn poll_discover(
        mut self: Pin<&mut Self>,
        cx: &mut Context<'_>,

poll_discoverというのがおそらくChannelから呼ばれるのだが、ここでは、Endpointセットの差分について、接続を開始した状態に変換して返している。

pub struct Balance<D: Discover, Req> {
    discover: D,

    services: ReadyCache<D::Key, D::Service, Req>,
    ready_index: Option<usize>,

    rng: SmallRng,

    _req: PhantomData<Req>,
}

あとは、poll_discoverで返ってきた差分をReadyCacheというところで管理し続けてp2cアルゴリズムを実行してるのだろうと思われる。

tonicのEndpointのconnect_lazy調査

tonic 0.3.1

Endpoint::connect_lazyが何をしているのか見ていく。strictなconnectもそこまで大差はない。

tonicのEndpoint::connect_lazyでは、まずhttpコネクション一般を作ったあと、

これをservice::connectorによってServiceに変換する。(したがって実質無意味)

重要なのはChannelである。

    pub fn connect_lazy(&self) -> Result<Channel, Error> {
        let mut http = hyper::client::connect::HttpConnector::new();
        http.enforce_http(false);
        http.set_nodelay(self.tcp_nodelay);
        http.set_keepalive(self.tcp_keepalive);

        #[cfg(feature = "tls")]
        let connector = service::connector(http, self.tls.clone());

        #[cfg(not(feature = "tls"))]
        let connector = service::connector(http);

        Ok(Channel::new(connector, self.clone()))
    }

Channelは、ざっくりいうとendpointにconnectするサービスである。

    pub(crate) fn new<C>(connector: C, endpoint: Endpoint) -> Self
    where
        C: Service<Uri> + Send + 'static,
        C::Error: Into<crate::Error> + Send,
        C::Future: Unpin + Send,
        C::Response: AsyncRead + AsyncWrite + HyperConnection + Unpin + Send + 'static,
    {
        let buffer_size = endpoint.buffer_size.clone().unwrap_or(DEFAULT_BUFFER_SIZE);

        let svc = Connection::lazy(connector, endpoint);
        let svc = Buffer::new(Either::A(svc), buffer_size);

        Channel { svc }
    }

Connection::lazyは、ざっくりいうとEndpointの設定を使ってtowerのServiceをスタックしていく(TimeoutLayerなど)。

最後に、Reconnectorをスタックする。ここがポイント

        let connector = HyperConnect::new(connector, settings);
        let conn = Reconnect::new(connector, endpoint.uri.clone(), is_lazy);

        let inner = stack.layer(conn)

Reconnectは何をしているか?

    fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
        let mut state;

        loop {
            match self.state {
                State::Idle => {
                    trace!("poll_ready; idle");
                    match self.mk_service.poll_ready(cx) {
                        Poll::Ready(r) => r?,
                        Poll::Pending => {
                            trace!("poll_ready; MakeService not ready");
                            return Poll::Pending;
                        }
                    }

                    let fut = self.mk_service.make_service(self.target.clone());
                    self.state = State::Connecting(fut);
                    continue;
                }
                State::Connecting(ref mut f) => {
                    trace!("poll_ready; connecting");
                    match Pin::new(f).poll(cx) {
                        Poll::Ready(Ok(service)) => {
                            state = State::Connected(service);
                        }
                        Poll::Pending => {
                            trace!("poll_ready; not ready");
                            return Poll::Pending;
                        }
                        Poll::Ready(Err(e)) => {
                            trace!("poll_ready; error");

                            state = State::Idle;

                            if !(self.has_been_connected || self.is_lazy) {
                                return Poll::Ready(Err(e.into()));
                            } else {
                                self.error = Some(e);
                                break;
                            }
                        }
                    }
                }
                State::Connected(ref mut inner) => {
                    trace!("poll_ready; connected");

                    self.has_been_connected = true;

                    match inner.poll_ready(cx) {
                        Poll::Ready(Ok(())) => {
                            trace!("poll_ready; ready");
                            return Poll::Ready(Ok(()));
                        }
                        Poll::Pending => {
                            trace!("poll_ready; not ready");
                            return Poll::Pending;
                        }
                        Poll::Ready(Err(_)) => {
                            trace!("poll_ready; error");
                            state = State::Idle;
                        }
                    }
                }
            }

            self.state = state;
        }

        self.state = state;
        Poll::Ready(Ok(()))
    }

tower::Serviceには2つのメソッドがある。1つはpoll_ready、もう1つはcallcallが本体なのだが、まずpoll_readyでお伺いを立ててからcallする。なぜこうなっているかというと、やられる側から「今はやめてくれ」ということによって流量を調整するバックプレッシャーのためである。

この処理を超ざっくりいうと

  1. 最初はIdleからはじまる。ここでコネクションをつなぐfutを起動する
  2. コネクションをつなぐfut(fで表されている)がReadyになるまで「出直してこい」という。もし、最初の接続が失敗してしまった場合、lazyならばエラーを返してIdleからやり直すが、strictならば即エラーとなる
  3. 一回繋がったら、(仕組みは不明だが)接続状態であるinnerにリクエストを送っていいかお伺いする。なんらかの理由によって(keep-aliveのタイムアウトとか?)リクエストを送る状態にない場合は、一度Idleからやり直す

このような仕組みによって、Channelは、接続が切れたとしてもReconnectしてくれるようになる。

tokio::fsはただspawn_blockingしてるだけ

tokio::fsはファイルシステムの操作をasyncにして提供するライブラリなのだが、ファイルシステムの操作はシステムコールを呼んでいるためブロッキングである。これをどうやって非同期に偽造してるかというと、

// 例としてread
pub async fn read(path: impl AsRef<Path>) -> io::Result<Vec<u8>> {
    let path = path.as_ref().to_owned();
    asyncify(move || std::fs::read(path)).await
}

pub(crate) async fn asyncify<F, T>(f: F) -> io::Result<T>
where
    F: FnOnce() -> io::Result<T> + Send + 'static,
    T: Send + 'static,
{
    match sys::run(f).await {
        Ok(res) => res,
        Err(_) => Err(io::Error::new(
            io::ErrorKind::Other,
            "background task failed",
        )),
    }
}

asyncifyという関数でラップしてるだけ。では、asyncifyは何なのかというと、中で呼ばれてるsys::runは、spawn_blockingである。

では、spawn_blockingは一旦何なのかというと、

In general, issuing a blocking call or performing a lot of compute in a future without yielding is not okay, as it may prevent the executor from driving other futures forward. This function runs the provided closure on a thread dedicated to blocking operations. See the CPU-bound tasks and blocking code section for more information.

RustのFutureが採用する協調的マルチタスキングでは、自分が何もしない時には自分からCPUを放棄しないといけない。ふつうにブロッキングコールをすると、他のタスクが動くチャンスを少なくしてしまうから、ブロッキング専用のスレッドに逃がしている。

Futureのselect!マクロ

async fn task_one() { /* ... */ }
async fn task_two() { /* ... */ }

async fn race_tasks() {
    let t1 = task_one().fuse();
    let t2 = task_two().fuse();

    pin_mut!(t1, t2);

    select! {
        () = t1 => println!("task one completed first"),
        () = t2 => println!("task two completed first"),
    }
}

async fn count() {
    let mut a_fut = future::ready(4);
    let mut b_fut = future::ready(6);
    let mut total = 0;

    loop {
        select! {
            a = a_fut => total += a,
            b = b_fut => total += b,
            complete => break,
            default => unreachable!(), // never runs (futures are ready, then complete)
        };
    }
    assert_eq!(total, 10);
}

というサンプルコードを要点だけ説明する。

  • パターンマッチ文である。Readyが返ってきたものについて矢印の右辺を実行
  • 2番目のサンプルからわかるように、何度も呼ぶと残ってるFutureも実行する。1回だけなら、最初に返ってきたFutureだけ実行する
  • completeとdefaultは特殊なパターンマッチ(loopを抜けるために存在しているようなもの)
  • FuseFutureを要求する。なぜかというと、loopの中で使えるようにするため。一度ReadyとなったものはNotReadyを返してほしい。こうしないとパニックする可能性がある
  • ピンしているのは、Unpinを要求するため。Unpinを要求しているのは、select!自身がFutureを消費せず、select!後にもFutureにアクセスすることを許すため

2番目のa_fut, b_futはFusedFutureを実装していないように見えるがreadyが実装。

impl<T> FusedFuture for Ready<T> {
    fn is_terminated(&self) -> bool {
        self.0.is_none()
    }
}

なぜFuture::pollはPinをとるのか?

Futureトレイトはこのような形をしている。

trait Future {
    type Output;
    fn poll(
        // Note the change from `&mut self` to `Pin<&mut Self>`:
        self: Pin<&mut Self>,
        // and the change from `wake: fn()` to `cx: &mut Context<'_>`:
        cx: &mut Context<'_>,
    ) -> Poll<Self::Output>;
}

executerはpollを呼び、値がReadyとなることを期待する。もしNotReadyである場合は、下からwakeされるまで待つ。Futureというのは、ステートマシンだといえる。

本質的には、

trait SimpleFuture {
    type Output;
    fn poll(&mut self, wake: fn()) -> Poll<Self::Output>;
}

のようなものと変わりないが、wakeにコンテキスト(データ)が必要な場合に対応するために、Contextという下に隠す設計になっている。

なぜこのPinが必要かというと、async fn構文のためである。async fn構文を使うことによって、awaitするところを状態としたステートマシンを自動生成することが出来る。async fnは中で巨大なFutureを生成する。

さて、もしここで、async fnの中で参照を使っていて、そのFutureをmoveしようとするとどうなるかというと、この時にポインタの値が変わってしまう。moveが出来ないというのはRustプログラミングの中では致命的なので、この制約を取り去りたい。例えば実際に、mapする場合などはmoveする。

シンプルな例でいうとこのような場合である。

async {
    let mut x = [0; 128];
    let read_into_buf_fut = read_into_buf(&mut x);
    read_into_buf_fut.await;
    println!("{:?}", x);
}
struct ReadIntoBuf<'a> {
    buf: &'a mut [u8], // points to `x` below
}

struct AsyncFuture {
    x: [u8; 128],
    read_into_buf_fut: ReadIntoBuf<'what_lifetime?>,
}

それでは困るため、Pinを使う。Pinは、&mut T, &T, Box<T>などに対してTのアドレスを固定(ピン)する。やり方の一つとしては、Box::pinがある。

pub fn pin(x: T) -> Pin<Box<T>>

Pinの逆Unpinは、中のTを取り出したり、書き換えたりする能力を持つ。moveしても問題が起こらない型については、Unpinが実装されている。

ECR50: F「Relatively Prime Powers」

かなり難しい数学問題。ラフな解法のみ。

問題

xを因数分解した時、指数部分のGCDが1である場合、この数をエレガントと呼ぶ。

2からnまでの数字の中でエレガントな数は何個あるか?

制約: n=1018

解法

まず、指数部分のGCDが2であるというのはどういう状況かというと、例えば, 22 34のような状況である。これは、指数部分をGCDで割ることによって(21 32)2と出来る。

n以下にこの括弧部分が何個あるかというと、n1/2個である。

従って求める解は、包除原理より、

sum[i=1..] { n1/i mu(i) }

のように書ける。n1/iの部分はiが60くらいで0になるから、これはO(60)くらいで求まることになる。

オイラー関数関連

x以下でxと素なものの個数

これはオイラー関数で求まる。xの素因数をp1,p2,..pkとして、x(1-1/p1)(1-1/p2)...(1-1/pk)

既約分数との関係(言い換え)

これは、

1/x,2/x,...,x/xの中で「既約分数」の個数を求めてるのと同じこと。

x以下でxと素なものの和

例えば、x=15として、素なものを列挙すると1,2,4,7,8,11,13,14となる。個数は15(1-1/3)(1-1/5)=8より一致。

この和は、(1+14)+(2+13)+(4+11)+(7+8)で求まる。つまり、オイラー関数をphi(x)とすると、x*phi(x)/2となる。

証明: xと素な整数aをとる。gcd(a,x)=1より、gcd(a,x-a)=1である。これはつまり、x以下の整数のうちxと素なもののうち、aを見つけるとx-aも自動的に見つかることを意味している。今、a*a!=xであるから、上のようにペアをとることが可能。