Efficient personal search at large scale

Vespa includes a relatively unknown mode which provides personal search at massive scale for a fraction of the cost of alternatives: streaming search. In this article we explain streaming search and how to use it.

Imagine you are tasked with building the next Gmail, a massive personal data store centered around search. How do you do it? An obvious answer is to just use a regular search engine, write all documents to a big index and simply restrict queries to match documents belonging to a single user.

This works, but the problem is cost. Successful personal data stores has a tendency to become massive — the amount of personal data produced in the world outweighs public data by many orders of magnitude. Storing indexes in addition to raw data means paying for extra disk space for all this data and paying for the overhead of updating this massive index each time a user changes or adds data. Index updates are costly, especially when they need to be handled in real time, which users often expect for their own data. Systems like Gmail handle billions of writes per day so this quickly becomes the dominating cost of the entire system.

However, when you think about it there’s really no need to go through the trouble of maintaining global indexes when each user only searches her own data. What if we just maintain a separate small index per user? This makes both index updates and queries cheaper, but leads to a new problem: Writes will arrive randomly over all users, which means we’ll need to read and write a user’s index on every update without help from caching. A billion writes per day translates to about 25k read-and write operations per second peak. Handling traffic at that scale either means using a few thousand spinning disks, or storing all data on SSD’s. Both options are expensive.

Large scale data stores already solve this problem for appending writes, by using some variant of multilevel log storage. Could we leverage this to layer the index on top of a data store like that? That helps, but means we need to do our own development to put these systems together in a way that performs at scale every time for both queries and writes. And we still pay the cost of storing the indexes in addition to the raw user data.

Do we need indexes at all though? With some reflection, it turns out that we don’t. Indexes consists of pointers from words/tokens to the documents containing them. This allows us to find those documents faster than would be possible if we had to read the content of the documents to find the right ones, of course at the considerable cost of maintaining those indexes. In personal search however, any query only accesses a small subset of the data, and the subsets are know in advance. If we take care to store the data of each subset together we can achieve search with low latency by simply reading the data at query time — what we call streaming search. In most cases, most subsets of data (i.e most users) are so small that this can be done serially on a single node. Subsets of data that are too large to stream quickly on a single node can be split over multiple nodes streaming in parallel.

Numbers

How many documents can be searched per node per second with this solution? Assuming a node with 500 Mb/sec read speed (either from an SSD or multiple spinning disks), and 1k average compressed document size, the disk can search max 500Mb/sec / 1k/doc = 500,000 docs/sec. If each user store 1000 documents each on average this gives a max throughput per node of 500 queries/second. This is not an exact computation since we disregard time used to seek and write, and inefficiency from reading non-compacted data on one hand, and assume an overly pessimistic zero effect from caching on the other, but it is a good indication that our solution is cost effective.

What about latency? From the calculation above we see that the latency from finding the matching documents will be 2 ms on average. However, we usually care more about the 99% latency (or similar). This will be driven by large users which needs to be split among multiple nodes streaming in parallel. The max data size per node is then a tradeoff between latency for such users and the overall cost of executing their queries (less nodes per query is cheaper). For example, we can choose to store max 50.000 documents per user per node such that we get a max latency of 100 ms per query. Lastly, the total number of nodes decides the max parallelism and hence latency for the very largest users. For example, with 20 nodes in total a cluster we can support 20 * 50k = 1 million documents for a single user with 100 ms latency.

All right — with this we have our cost-effective solution to implement the next Gmail: Store just the raw data of users, in a log-level store. Locate the data of each user on a single node in the system for locality (or, really 2–3 nodes for redundancy), but split over multiple nodes for users that grow large. Implement a fully functional search and relevance engine on top of the raw data store, which distributes queries to the right set of nodes for each user and merges the results. This will be cheap and efficient, but it sounds like a lot of work! It sure would be nice if somebody already did all of it, ran it at large scale for years and then released it as open source.

Well, as luck would have it we already did this in Vespa. In addition to the standard indexing mode, Vespa includes a streaming mode for documents which provides this solution, implemented by layering the full search engine functionality over the raw data store built into Vespa. When this solution is compared to indexed search in Vespa or more complicated sharding solutions in Elastic Search for personal search applications, we typically see about an order of magnitude reduction in cost of achieving a system which can sustain the query and update rates needed by the application with stable latencies over long time periods. It has been used to implement various applications such as storing and searching massive amounts of mails, personal typeahead suggestions, personal image collections, and private forum group content.

Using streaming search on Vespa

The steps to using streaming search on Vespa are:

  • Set streaming mode for the document type(s) in question in services.xml.
  • Write documents with a group name (e.g a user id) in their id, by setting g=[groupid] in the third part of the document id, as in e.g id:mynamespace:mydocumenttype:g=user123:doc123
  • Pass the group id in queries by setting the query property streaming.groupname in queries.

That’s it! With those steps you have created a scalable, battle-proven personal search solution which is an order of magnitude cheaper than any alternative out there, with full support for structured and text search, advanced relevance including natural language and machine-learned models, and powerful grouping and aggregation for features like faceting. For more details see the documentation on streaming search. Have fun with it, and as usual let us know what you are building!

Machine-learned model serving at scale

Photo by Jukan Tateisi
on Unsplash

Imagine you have a machine-learned model that you would like to use in some
application, for instance, a transformer model to generate vector
representations from text. You measure the time it takes for a single model
evaluation. Then, satisfied that the model can be evaluated quickly enough, you
deploy this model to production in some model server. Traffic increases, and
suddenly the model is executing much slower and can’t sustain the expected
traffic at all, severely missing SLAs. What could have happened?

You see, most libraries and platforms for evaluating machine-learned models are
by default tuned to use all available resources on the machine for model
inference. This means parallel execution utilizing a number of threads equal to
the number of available cores, or CPUs, on the machine. This is great for a
single model evaluation.

Unfortunately, this breaks down for concurrent evaluations. This is an
under-communicated and important point.

Let’s take a look at what happens. In the following, we serve a transformer
model using Vespa.ai. Vespa is a highly performant and
web-scalable open-source platform for applications that perform real-time data
processing over large data sets. Vespa.ai uses ONNX
Runtime under the hood for model acceleration. We’ll
use the original BERT-base model, a
12-layer, 109 million parameter transformer neural network. We test the
performance of this model on a 32-core Xeon Gold 2.6GHz machine. Initially,
this model can be evaluated on this particular machine in around 24
milliseconds.

Concurrency vs latency and throughput - 32 threads

Here, the blue line is the 95th percentile latency, meaning that 95% of all
requests have latency lower than this. The red line is the throughput: the
requests per second the machine can handle. The horizontal axis is the number
of concurrent connections (clients).

As the number of simultaneous connections increases, the latency increases
drastically. The maximum throughput is reached at around 10 concurrent
requests. At that point, the 95th percentile latency is around 150ms, pretty
far off from the expected 24ms. The result is a highly variable and poor
experience.

The type of application dictates the optimal balance between low latency and
high throughput. For instance, if the model is used for an end-user query,
(predictably) low latency is important for a given level of expected traffic.
On the other hand, if the model generates embeddings before ingestion in some
data store, high throughput might be more important. The driving force for both
is cost: how much hardware is needed to support the required throughput. As an
extreme example, if your application serves 10 000 queries per second with a
95% latency requirement of 50ms, you would need around 200 machines with the
setup above.

Of course, if you expect only a minimal amount of traffic, this might be
totally fine. However, if you are scaling up to thousands of requests per
second, this is a real problem. So, we’ll see what we can do to scale this up
in the following.

Parallel execution of models

We need to explain the threading model used during model inference to see what
is happening here. In general, there are 3 types of threads: inference
(application), inter-operation, and intra-operation threads. This is a common
feature among multiple frameworks, such as TensorFlow, PyTorch, and ONNX
Runtime.

The inference threads are the threads of the main application. Each request
gets handled in its own inference thread, which is ultimately responsible for
delivering the result of the model evaluation given the request.

The intra-operation threads evaluate single operations with multi-threaded
implementations. This is useful for many operations, such as element-wise
operations on large tensors, general matrix multiplications, embedding lookups,
and so on. Also, many frameworks chunk together several operations into a
higher-level one that can be executed in parallel for performance.

The inter-operation threads are used to evaluate independent parts of the
model in parallel. For instance, a model containing two distinct paths joined
in the end might benefit from this form of parallel execution. Examples are
Wide and Deep models or two-tower encoder architectures.

Various thread pools in inference

In the example above, which uses ONNX Runtime, the default disables the
inter-operation threads. However, the number of intra-operation threads is
equal to the number of CPUs on the machine. In this case, 32. So, each
concurrent request is handled in its own inference thread. Some operations,
however, are executed in parallel by employing threads from the intra-operation
thread pool. Since this pool is shared between requests, concurrent requests
need to wait for available threads to progress in the execution. This is why
the latency increases.

The model contains operations that are run both sequentially and in parallel.
That is why throughput increases to a certain level even as latency increases.
After that, however, throughput starts decreasing as we have a situation where
more threads are performing CPU-bound work than physical cores in the machine.
This is obviously detrimental to performance due to excessive thread swapping.

Scaling up

To avoid this thread over-subscription, we can ensure that each model runs
sequentially in its own inference thread. This avoids the competition between
concurrent evaluations for the intra-op threads. Unfortunately, it also avoids
the benefits of speeding up a single model evaluation using parallel execution.

Let’s see what happens when we set the number of intra-op threads to 1.

Concurrency vs latency and throughput - 1 thread

As seen, the latency is relatively stable up to a concurrency equalling the
number of cores on the machine (around 32). After that, latency increases due
to the greater number of threads than actual cores to execute them. The
throughput also increases to this point, reaching a maximum of around 120
requests per second, which is a 40% improvement. However, the 95th percentile
latency is now around 250ms, far from expectations.

So, the model that initially seemed promising might not be suitable for
efficient serving after all.

The first generation of transformer models, like BERT-base used above, are
relatively large and slow to evaluate. As a result, more efficient models that
can be used as drop-in replacements using the same tokenizer and vocabulary
have been developed. One example is the
XtremeDistilTransformers family. These are
distilled from BERT and have similar accuracy as BERT-base on many different
tasks with significantly lower computational complexity.

In the following, we will use the
XtremeDistil-l6-h256
model, which only has around 13M parameters compared to BERT-base’s 109M.
Despite having only 12% of the parameter count, the accuracy of this model is
very comparable to the full BERT-base model:

Distilled models accuracy

Using the default number of threads (same as available on the system), this
model can be evaluated on the CPU is around 4ms. However, it still suffers from
the same scaling issue as above with multiple concurrent requests. So, let’s
see how this scales with concurrent requests with single-threaded execution:

Concurrency vs latency and throughput with 1 intra-op thread on distilled model

As expected, the latency is much more stable until we reach concurrency levels equalling the number of cores on the machine. This gives a much better and predictable experience. The throughput now tops out at around 1600 requests per second, vastly superior to the other model, which topped out at roughly 120 requests per second. This results in much less hardware needed to achieve wanted levels of performance.

Experiment details

To measure the effects of scaling, we’ve used Vespa.ai, an open-source platform
for building applications that do real-time data processing over large data
sets. Designed to be highly performant and web-scalable, it is used for diverse
tasks such as search, personalization, recommendation, ads, auto-complete,
image and similarity search, comment ranking, and even finding
love.

Vespa.ai has many integrated features
and supports many use cases right out of the box. Thus, it offers a simplified
path to deployment in production without the complexity of maintaining many
different subsystems. We’ve used Vespa.ai as an easy-to-use model
server in this post.
In Vespa.ai, it is straightforward to tune the threading model to
use for each model:

<model-evaluation>
  <onnx>
    <models>
      <model name="reranker_margin_loss_v4">
        <intraop-threads> number </intraop-threads>
        <interop-threads> number </interop-threads>
        <execution-mode> parallel | sequential </execution-mode>
      </model>
    </models>
  </onnx>
</model-evaluation>

Also, it is easy to scale out horizontally to use additional nodes for model
evaluation. We have not explored that in this post.

The data in this post has been collected using Vespa’s
fbench tool,
which drives load to a system for benchmarking. Fbench provides detailed and
accurate information on how well the system manages the workload.

Summary

In this post, we’ve seen that the default thread settings are not suitable for
model serving in production, particularly for applications with a high degree
of concurrent requests. The competition for available threads between parallel
model evaluations leads to thread oversubscription and performance suffers. The
latency also becomes highly variable.

The problem is the shared intra-operation thread pool. Perhaps a different
threading model should be considered, which allows for utilizing multiple
threads in low traffic situations, but degrades to sequential evaluation when
high concurrency is required.

Currently however, the solution is to ensure that models are running in their
own threads. To manage the increased latency, we turned to model distillation,
which effectively lowers the computational complexity without sacrificing
accuracy. There are additional optimizations available which we did not touch
upon here, such as model
quantization.
Another one that is important for transformer models is limiting input length
as evaluation time is quadratic to the input length.

We have not considered GPU evaluation here, which can significantly accelerate
execution. However, cost at scale is a genuine concern here as well.

The under-communicated point here is that platforms that promise very low
latencies for inference are only telling part of the story. As an example,
consider a platform promising 1ms latency for a given model. Naively, this can
support 1000 queries per second. However, consider what happens if 1000
requests arrive at almost the same time: the last request would have had to
wait almost 1 second before returning. This is far off from the expected 1ms.

Announcing vector streaming search: AI assistants at scale without breaking the bank

Decorative
image

Photo by Marc Sendra Martorell on Unsplash

If you are using a large language model to build a personal assistant
you often need to give it access to personal data such as email, documents or images.
This is usually done by indexing the vectors in a vector database and retrieving by approximate nearest neighbor (ANN) search.

In this post we’ll explain why this is not a good solution for personal data
and introduce an alternative which is an order of magnitude cheaper while actually solving the problem:
Vector streaming search.

Let’s just build an ANN index?

Let’s say you’re building a personal assistant who’s working with personal data averaging 10k documents per user,
and that you want to scale to a million users – that is 10B documents.
And let’s say you are using typical cost-effective embeddings of 384 bfloat16s – 768 bytes per document.
How efficient can we make this in a vector database?

Let’s try to handle it the normal way by maintaining a global (but sharded) approximate nearest neighbor vector index.
Queries will need to calculate distances for vectors in a random access pattern as they are found in the index,
which means they’ll need to be in memory to deliver interactive latency.
Here, we need 10B * 768 bytes = 7.68 Tb of memory for the vector,
plus about 20% for the vector index for a total of about 9.2 Tb memory to store a single copy of the data.
In practice though you need two copies to be able to deliver a user’s data reliably,
some headroom for other in-memory data (say 10%), and about 35% headroom for working memory.
This gives a grand total of 9.2 * 2 * 1.1 / 0.65 = 31Tb.

If we use nodes with 128Gb memory that works out to 31Tb / 128Gb = 242 nodes.
On AWS, we can use i4i.4xlarge nodes at a cost of about $33 per node per day, so our total cost becomes 242 * 33 = $8000 per day.

Hefty, but at least we get a great solution right? Well, not really.

The A in ANN stands for approximate – the results from an ANN index will be missing some documents,
including likely some of the very best ones. That is often fine when working with global data,
but is it really acceptable to miss the one crucial mail, photo or document the user needs to complete some task correctly?

In addition – ANN indexes shine when most of the vectors in the data are eligible for a given query,
that is when query filters are weak. But here we need to filter on the user’s own data,
so our filter is very strong indeed and our queries will be quite expensive despite all the effort of building the index.
In fact it would be cheaper to not make use of the index at all (which is what Vespa would automatically do when given these queries).

Lastly, there’s write speed. A realistic speed here is about 8k inserts per node per second.
Since we have 2 * 10B/242 = 82 M documents per node that means it will take about
82M/(8k * 3600) = 2.8 hours to feed the entire data set even though we have this massive amount of powerful nodes.

To recap, this solution has four problems as shown in this table:

Regular ANN for personal data
❌ CostAll the vectors must be in memory, which becomes very expensive.
❌ CoverageANN doesn’t find all the best matches, problematic with personal data.
❌ Query performanceQueries are expensive to the point of making an ANN index moot.
❌ Write performanceWriting the data set is slow.

Can we do better?

Let’s consider some alternatives.

The first observation to make is that we are building a global index capable of searching all user’s data at once,
but we are not actually using this capability since we always search in the context of a single user.
So, could we build a single ANN index per user instead?

This actually makes the ANN indexes useful since there is no user filter. However, the other three problems remain.

ANN (approximate nearest neighbor) for personal data
❌ CostAll the vectors must be in memory, which becomes very expensive.
❌ CoverageANN doesn’t find all the best matches, problematic with personal data.
✅ Query performanceOne index per user makes queries cheap.
❌ Write performanceWriting the data set is slow.

Can we drop the ANN index and do vector calculations brute force?
This is actually not such a bad option (and Vespa trivially supports it).
Since each user has a limited number of documents, there is no problem getting good latency by brute forcing over a user’s vectors.
However, we still store all the vectors in memory so the cost problem remains.

NN (exact nearest neighbor) for personal data
❌ CostAll the vectors must be in memory, which becomes very expensive.
✅ CoverageAll the best matches are guaranteed to be found.
✅ Query performanceCheap enough: One user’s data is a small subset of a node’s data.
✅ Write performanceWriting data is an order of magnitude faster than with ANN.

Can we avoid the memory cost? Vespa provides an option to mark vectors paged,
meaning portions of the data will be swapped out to disk.
However, since this vector store is not localizing the data of each user
we still need a good fraction of the data in memory to stay responsive, and even so both query and write speed will suffer.

NN (exact nearest neighbor) with paged vectors for personal data
🟡 CostA significant fraction of data must be in memory.
✅ CoverageAll the best matches are guaranteed to be found.
🟡 Query performanceReading vectors from disk with random access is slow.
✅ Write performanceWriting data is an order of magnitude faster than with ANN.

Can we do even better, by localizing the vector data of each user
and so avoid the egregious memory cost altogether while keeping good performance?
Yes, with Vespa’s new vector streaming search you can!

Vespa’s streaming search solution lets you make the user id a part of the document id
so that Vespa can use it to co-locate the data of each user on a small set of nodes and on the same chunk of disk.
This allows you to do searches over a user’s data with low latency without keeping any user’s data in memory
nor paying the cost of managing indexes at all.

This mode has been available for a long time for text and metadata search,
and we have now extended it to support vectors and tensors as well, both for search and ranking.

With this mode you can store billions of user vectors, along other data, on each node without running out of memory,
write it at a very high throughput thanks to Vespa’s log data store, and run queries with:

  • High throughput: Data is co-located on disk, or in memory buffers for recently written data.
  • Low latency regardless user data size: Vespa will,
    in addition to co-locating a user’s data, also automatically spread it over a sufficient number of nodes to bound query latency.

In addition you’ll see about an order of magnitude higher write throughput per node than with a vector indexing solution.

The resource driving cost instead moves to disk I/O capacity, which is what makes streaming so much cheaper.
To compare with our initial solution which required 242 128Gb nodes – streaming requires 45b to be stored in memory per document
so we’ll be able to cram about 128Gb / 45 * 0.65 = 1.84 B documents on each node.
We can then fit two copies of the 10B document corpus on 20B / 1.84B = 11 nodes.

Quite a reduction! In a very successful application you may want a little more to deliver sufficient query capacity
(see the performance case study below), but this is the kind of savings you’ll see for real production systems.

Vector streaming search for personal data
✅ CostNo vector data (or other document data) must be in memory.
✅ CoverageAll the best matches are guaranteed to be found.
✅ Query performanceLocalized disk reads are fast.
✅ Write performanceWriting data is faster even with less than 1/20 of the nodes.

You can also combine vector streaming search with regular text search
and metadata search with little additional cost, and with advanced machine-learned ranking on the content nodes.
These are features you’ll also need if you want to create an application that gives users high quality responses.

To use streaming search in your application, make these changes to it:

  • Set streaming search mode for the document type in services.xml:
        <documents>
            <document type="my-document-type" mode="streaming" />
        </documents>
  • Feed documents with ids that includes the user id of each document by
    setting the group value on ids. Id’s will then be on the form id:myNamespace:myType:g=myUserid:myLocalId where the g=myUserId is new.
  • Set the user id to search on each query by setting the parameter
    streaming.groupname to the user id.

See the streaming search documentation for more details,
and try out the vector streaming search sample application to get started.

Performance case study

To measure the performance of Vespa’s vector streaming search we deployed a modified version of the
nearest neighbor streaming performance test
to Vespa Cloud.
We changed the node resources
and count for container and content nodes to fit the large scale use case.

The dataset used is generated and consists of 48B documents, spread across 3.7M users.
The average number of documents per user is around 13000, and the document user distribution is as follows:

Documents per userPercentage of users
10035%
100028%
1000022%
5000010%
1000005%

We used 20 content nodes with the following settings to store around 2.4B documents per content node (redundancy=1).
These nodes equate to the AWS i4i.4xlarge instance with 1 3750Gb AWS Nitro local SSD disk.

<nodes deploy:environment="perf" count="20">
    <resources vcpu="16" memory="128Gb" disk="3750Gb" storage-type="local" architecture="x86_64"/>
</nodes>

Content nodes

Vespa Cloud console showing the 20 content nodes allocated to store the dataset.

We used the following settings for container nodes. The node count was adjusted based on the particular test to run.
These nodes equate to the AWS Graviton 2 c6g.2xlarge instance.

<nodes deploy:environment="perf" count="32">
    <resources
        vcpu="8"
        memory="16Gb"
        disk="20Gb"
        storage-type="remote"
        architecture="arm64"/>
</nodes>

Feeding performance

The schema
in the application has two fields:

  • field id type long
  • field embedding type tensor<bfloat16>(x[384])

The embeddings are randomly generated by a document processor
while feeding the documents. In total each document is around 800 bytes, including the document id.
Example document put for user with id 10000021:

{
    "put":"id:test:test:g=10000021:81",
    "fields":{
        "id":81,
        "embedding":[
            0.424140,0.663390,
            ..,
            0.261550,0.860670
        ]
    }
}

To feed the dataset we used three instances of Vespa CLI
running in parallel on a non-AWS machine in the same geographic region (us east).
This machine has 48 CPUs and 256Gb of memory, and used between 40 and 48 CPU cores during feeding.
The total feed throughput was around 300k documents per second, and the total feed time was around 45 hours.

Feed throughput

Vespa Cloud console showing feed throughput towards the end of feeding 48B documents.

Feed throughput

Vespa Cloud console showing the 32 container nodes allocated when feeding the dataset.

Query performance

To analyze query performance we focused on users with 1k, 10k, 50k and 100k documents each.
For each of these four groups we drew between 160k and 640k random user ids to generate query files with 10k queries each.
Each query uses the nearestNeighbor
query operator to perform an exact nearest neighbor search over all documents for a given user.
Example query for user with id 10000021:

yql=select * from sources * where {targetHits:10}nearestNeighbor(embedding,qemb)
&input.query(qemb)=[...]
&streaming.groupname=10000021
&ranking.profile=default
&presentation.summary=minimal
&hits=10

This query returns the 10 closest documents according to the angular distance between the document embeddings and the query embedding.
See how the default ranking profile
is

Yahoo Mail turns to Vespa to do RAG at scale

Yahoo Mail is one of the largest mail providers in the world. Now they’re also taking a shot at being the most
innovative with their recent release of AI-driven features which lets you
ask questions of your mailbox
and tell it to do things for you.

At the core of these features you find 1) a large language model which can understand and generate text,
and 2) a retrieval system that finds the relevant information in your inbox to feed into this model,
typically by a semantic search using vector embeddings. These two components together with the orchestration
which combines them nowadays goes under the moniker RAG – Retrieval Augmented Generation.

We’re in the middle – or at the feeble start? – of a massive boom of this technology, and so there’s no
lack of tools that allows you to build your own RAG demos. However, Yahoo’s aim is to make this work for all of
their users while being so cost-effective that it can still be offered for free, and for this they have
naturally turned to Vespa.ai. Vespa is the only vector database technology that:

  • lets you implement a cost-effective RAG system using personal data,
  • support vector embeddings, structured data and full text in the same queries and ranking functions, and
  • is proven to operate effectively, reliably storing and searching trillions of documents.

Making interaction with email an order of magnitude simpler and faster for this many people is a massively
meaningful endeavor, and we’re excited to be helping the team as they build the new intelligent Yahoo Mail,
and to see what features they’ll be adding next. To see for yourself, you can sign up at
Yahoo Mail levelup,
and if you want to build your own production scale RAG system, we recommend our fully open source
documentation search RAG application as a starting point.