Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add metric for search split affinity #4998

Open
wants to merge 5 commits into
base: main
Choose a base branch
from

Conversation

trinity-1686a
Copy link
Contributor

Description

fix #4996

How was this PR tested?

verified the metric exists by running a small cluster

@fulmicoton
Copy link
Contributor

fulmicoton commented May 20, 2024

I am surprised by the placement code. I thought Francois had updated it for something smarter a long time ago. I might be mistaking it for something else.

Currently the code looks like that.

 pub async fn assign_jobs<J: Job>(
        &self,
        mut jobs: Vec<J>,
        excluded_addrs: &HashSet<SocketAddr>,
    ) -> anyhow::Result<impl Iterator<Item = (SearchServiceClient, Vec<J>)>> {
        let num_nodes = self.searcher_pool.len();

        let mut candidate_nodes: Vec<CandidateNodes> = self
            .searcher_pool
            .pairs()
            .into_iter()
            .filter(|(grpc_addr, _)| {
                excluded_addrs.is_empty()
                    || excluded_addrs.len() == num_nodes
                    || !excluded_addrs.contains(grpc_addr)
            })
            .map(|(grpc_addr, client)| CandidateNodes {
                grpc_addr,
                client,
                load: 0,
            })
            .collect();

        if candidate_nodes.is_empty() {
            bail!(
                "failed to assign search jobs. there are no available searcher nodes in the pool"
            );
        }
        jobs.sort_unstable_by(Job::compare_cost);

        let mut job_assignments: HashMap<SocketAddr, (SearchServiceClient, Vec<J>)> =
            HashMap::with_capacity(num_nodes);

        for job in jobs {
            sort_by_rendez_vous_hash(&mut candidate_nodes, job.split_id());
            // Select the least loaded node.
            let chosen_node_idx = if candidate_nodes.len() >= 2 {
                usize::from(candidate_nodes[0].load > candidate_nodes[1].load)
            } else {
                0
            };
            let chosen_node = &mut candidate_nodes[chosen_node_idx];
            chosen_node.load += job.cost();

            job_assignments
                .entry(chosen_node.grpc_addr)
                .or_insert_with(|| (chosen_node.client.clone(), Vec::new()))
                .1
                .push(job);
        }
        Ok(job_assignments.into_values())
    }

Francois's algorithm was computing the perfect target load.
We would then allocate the node with the best affinity as long as we don't have exceeded the average.
We could even add a small margin.

@fmassot was it used in a different part of the code or has it never been merged?

and remove comment about job assignment. it was made to create a
discussion, and the discussion now exists
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Add metrics to track split affinity ratio
2 participants