Hands-On RAG guide for personal data with Vespa and LLamaIndex

This blog post is a hands-on RAG tutorial demonstrating how to use Vespa streaming mode for cost-efficient retrieval of personal data. You can read more about Vespa streaming search in these two blog posts:

This blog post is also available as a runnable notebook where you can have this app up and running on
Vespa Cloud in minutes
(
Open In Colab
)

The blog post covers:

  • Configuring Vespa and using Vespa streaming mode with PyVespa.
  • Using Vespa native built-in embedders in combination with streaming mode.
  • Ranking in Vespa, including hybrid retrieval and ranking methods, freshness (recency) features, and Vespa Rank Fusion.
  • Query federation and blending retrieved results from multiple sources/schemas.
  • Connecting LLamaIndex retrievers with a Vespa app to build generative AI pipelines.

TLDR; Vespa streaming mode

Vespa’s streaming search solution lets you make the user a part of the document ID so that Vespa can use it to co-locate the data of each user on a small set of nodes and the same chunk of disk.
Streaming mode allows searching over a user’s data with low latency without keeping any user’s data in memory or paying the cost of managing indexes.

  • There is no accuracy drop for vector search as it uses exact vector search
  • Several orders of magnitude higher write throughput (No expensive index builds to support approximate search)
  • Documents (including vector data) are 100% disk-based, significantly reducing deployment cost
  • Queries are restricted to content by the user ID/(groupname)

Storage cost is the primary cost driver of Vespa streaming mode; no data is in memory. Avoiding memory usage lowers deployment costs significantly.
For example, Vespa Cloud allows storing streaming mode data at below 0.30$ per GB/month. Yes, that is per month.

Getting started with LLamaIndex and PyVespa

The focus is on using the streaming mode feature in combination with multiple Vespa schemas; in our case,
we imagine building RAG over personal mail and calendar data, allowing effortless query federation and blending
of the results from multiple data sources for a given user.

First, we must install dependencies:

! pip3 install pyvespa llama-index

Synthetic Mail & Calendar Data

There are few public email datasets because people care about their privacy, so this notebook uses synthetic data to examine how to use Vespa streaming mode.
We create two generator functions that return Python dicts with synthetic mail and calendar data.

Notice that the dict has three keys:

This is the expected feed format for PyVespa feed operations and
where PyVespa will use these to build a Vespa document v1 API request(s).
The groupname key is only relevant with streaming mode.

mail

from typing import List

def synthetic_mail_data_generator() -> List[dict]:
    synthetic_mails = [
        {
            "id": 1,
            "groupname": "[email protected]",
            "fields": {
                "subject": "LlamaIndex news, 2023-11-14",
                "to": "[email protected]",
                "body": """Hello Llama Friends 🦙 LlamaIndex is 1 year old this week! 🎉 To celebrate, we're taking a stroll down memory 
                    lane on our blog with twelve milestones from our first year. Be sure to check it out.""",
                "from": "[email protected]",
                "display_date": "2023-11-15T09:00:00Z"
            }
        },
        {
            "id": 2,
            "groupname": "[email protected]",
            "fields": {
                "subject": "Dentist Appointment Reminder",
                "to": "[email protected]",
                "body": "Dear Jo Kristian ,\nThis is a reminder for your upcoming dentist appointment on 2023-12-04 at 09:30. Please arrive 15 minutes early.\nBest regards,\nDr. Dentist",
                "from": "[email protected]",
                "display_date": "2023-11-15T15:30:00Z"
            }
        },
        {
            "id": 1,
            "groupname": "[email protected]",
            "fields": {
                "subject": "Wildlife Update: Giraffe Edition",
                "to": "[email protected]",
                "body": "Dear Wildlife Enthusiasts 🦒, We're thrilled to share the latest insights into giraffe behavior in the wild. Join us on an adventure as we explore their natural habitat and learn more about these majestic creatures.",
                "from": "[email protected]",
                "display_date": "2023-11-12T14:30:00Z"
        }
        },
        {
            "id": 1,
            "groupname": "[email protected]",
            "fields": {
                "subject": "Antarctica Expedition: Penguin Chronicles",
                "to": "[email protected]",
                "body": "Greetings Explorers 🐧, Our team is embarking on an exciting expedition to Antarctica to study penguin colonies. Stay tuned for live updates and behind-the-scenes footage as we dive into the world of these fascinating birds.",
                "from": "[email protected]",
                "display_date": "2023-11-11T11:45:00Z"
            }
        },
        {
            "id": 1,
            "groupname": "[email protected]",
            "fields": {
                "subject": "Space Exploration News: November Edition",
                "to": "[email protected]",
                "body": "Hello Space Enthusiasts 🚀, Join us as we highlight the latest discoveries and breakthroughs in space exploration. From distant galaxies to new technologies, there's a lot to explore!",
                "from": "[email protected]",
                "display_date": "2023-11-01T16:20:00Z"
            }
         },
        {
            "id": 1,
            "groupname": "[email protected]",
            "fields": {
                "subject": "Ocean Discovery: Hidden Treasures Unveiled",
                "to": "[email protected]",
                "body": "Dear Ocean Explorers 🌊, Dive deep into the secrets of the ocean with our latest discoveries. From undiscovered species to underwater landscapes, our team is uncovering the wonders of the deep blue.",
                "from": "[email protected]",
                "display_date": "2023-10-01T10:15:00Z"
            }
        }
    ]
    for mail in synthetic_mails:
        yield mail  

calendar

Similarily, for calendar data

from typing import List

def synthetic_calendar_data_generator() -> List[dict]:
    calendar_data = [
       
        {
            "id": 1,
            "groupname": "[email protected]",
            "fields": {
                "subject": "Dentist Appointment",
                "to": "[email protected]",
                "body": "Dentist appointment at 2023-12-04 at 09:30 - 1 hour duration",
                "from": "[email protected]",
                "display_date": "2023-11-15T15:30:00Z",
                "duration": 60,
            }
        },
         {
            "id": 2,
            "groupname": "[email protected]",
            "fields": {
                "subject": "Public Cloud Platform Events",
                "to": "[email protected]",
                "body": "The cloud team continues to push new features and improvements to the platform. Join us for a live demo of the latest updates",
                "from": "public-cloud-platform-events",
                "display_date": "2023-11-21T09:30:00Z",
                "duration": 60,
            }
        }
    ]
    for event in calendar_data:
        yield event

Definining a Vespa application

PyVespa helps us build the Vespa application package.
A Vespa application package comprises configuration files, code (plugins), and models.

We define two Vespa schemas for our mail and calendar data. PyVespa
offers a programmatic API for creating the schema. Ultimately, the programmatic representation is serialized to files (<schema-name>.sd).

In the following we define the fields and their type. Note that we set mode to streaming,
which enables Vespa streaming mode for this schema.
Other valid modes are indexed and store-only.

mail schema

from vespa.package import Schema, Document, Field, FieldSet, HNSW
mail_schema = Schema(
            name="mail",
            mode="streaming",
            document=Document(
                fields=[
                    Field(name="id", type="string", indexing=["summary", "index"]),
                    Field(name="subject", type="string", indexing=["summary", "index"]),
                    Field(name="to", type="string", indexing=["summary", "index"]),
                    Field(name="from", type="string", indexing=["summary", "index"]),
                    Field(name="body", type="string", indexing=["summary", "index"]),
                    Field(name="display_date", type="string", indexing=["summary"]),
                    Field(name="timestamp", type="long", indexing=["input display_date", "to_epoch_second", "summary", "attribute"], is_document_field=False),
                    Field(name="embedding", type="tensor<bfloat16>(x[384])",
                        indexing=["\"passage: \" . input subject .\" \". input body", "embed e5", "attribute", "index"],
                        ann=HNSW(distance_metric="angular"),
                        is_document_field=False
                    )
                ],
            ),
            fieldsets=[
                FieldSet(name = "default", fields = ["subject", "body", "to", "from"])
            ]
)

In the mail schema, we have six document fields; these are provided by us when we feed documents of type mail to this app.
The fieldset defines
which fields are matched against when we do not mention explicit field names when querying. We can add as many fieldsets as we like without duplicating content.

In addition to the fields within the document, there are two synthetic fields in the schema, timestamp, and embedding,
using Vespa indexing expressions
taking inputs from the document and performing conversions.

  • the timestamp field takes the input display_date and uses the to_epoch_second converter converter to convert the
    display date into an epoch timestamp. This is useful because we can calculate the document’s age and use the freshness(timestamp) rank feature during ranking phases.
  • the embedding tensor field takes the subject and body as input. It feeds that into an embed function that uses an embedding model to map the string input into an embedding vector representation
    using 384-dimensions with bfloat16 precision. Vectors in Vespa are represented as Tensors.

calendar schema

from vespa.package import Schema, Document, Field, FieldSet, HNSW
calendar_schema = Schema(
            name="calendar",
            inherits="mail",
            mode="streaming",
            document=Document(inherits="mail",
                fields=[
                    Field(name="duration", type="int", indexing=["summary", "index"]),
                    Field(name="guests", type="array<string>", indexing=["summary", "index"]),
                    Field(name="location", type="string", indexing=["summary", "index"]),
                    Field(name="url", type="string", indexing=["summary", "index"]),
                    Field(name="address", type="string", indexing=["summary", "index"])
                ]
            )
)

The calendar schema inherits from the mail schema, meaning we don’t have to define the embedding field for the
calendar schema.

Configuring embedders

The observant reader might have noticed the e5 argument to the embed expression in the above mail schema embedding field.
The e5 argument references a component of the type hugging-face-embedder. In this
example, we use the e5-small-v2 text embedding model that maps text to 384-dimensional vectors.

from vespa.package import ApplicationPackage, Component, Parameter

vespa_app_name = "assistant"
vespa_application_package = ApplicationPackage(
        name=vespa_app_name,
        schema=[mail_schema, calendar_schema],
        components=[Component(id="e5", type="hugging-face-embedder",
            parameters=[
                Parameter("transformer-model", {"url": "https://github.com/vespa-engine/sample-apps/raw/master/simple-semantic-search/model/e5-small-v2-int8.onnx"}),
                Parameter("tokenizer-model", {"url": "https://raw.githubusercontent.com/vespa-engine/sample-apps/master/simple-semantic-search/model/tokenizer.json"})
            ]
        )]
) 

We share and reuse the same embedding model for both schemas. Note that embedding inference is resource-intensive.

Ranking

In the last step of configuring the Vespa app, we add ranking profiles by adding rank-profile’s to the schemas. Vespa supports phased ranking and has a rich set of built-in rank-features.

One can also define custom functions with ranking expressions.

from vespa.package import RankProfile, Function, GlobalPhaseRanking, FirstPhaseRanking

keywords_and_freshness = RankProfile(
    name="default", 
    functions=[Function(
        name="my_function", expression="nativeRank(subject) + nativeRank(body) + freshness(timestamp)"
    )],
    first_phase=FirstPhaseRanking(
        expression="my_function",
        rank_score_drop_limit=0.02
    ),
    match_features=["nativeRank(subject)", "nativeRank(body)", "my_function", "freshness(timestamp)"],
)

semantic = RankProfile(
    name="semantic", 
    functions=[Function(
        name="cosine", expression="max(0,cos(distance(field, embedding)))"
    )],
    inputs=[("query(q)", "tensor<float>(x[384])"), ("query(threshold)","", "0.75")],
    first_phase=FirstPhaseRanking(
        expression="if(cosine > query(threshold), cosine, -1)",
        rank_score_drop_limit=0.1
    ),
    match_features=["cosine", "freshness(timestamp)", "distance(field, embedding)", "query(threshold)"],
)

fusion = RankProfile(
    name="fusion",
    inherits="semantic",
    functions=[
        Function(
            name="keywords_and_freshness", expression=" nativeRank(subject) + nativeRank(body) + freshness(timestamp)"
        ),
        Function(
            name="semantic", expression="cos(distance(field,embedding))"
        )

    ],
    inputs=[("query(q)", "tensor<float>(x[384])"), ("query(threshold)", "", "0.75")],
    first_phase=FirstPhaseRanking(
        expression="if(cosine > query(threshold), cosine, -1)"

Efficient personal search at large scale

Vespa includes a relatively unknown mode which provides personal search at massive scale for a fraction of the cost of alternatives: streaming search. In this article we explain streaming search and how to use it.

Imagine you are tasked with building the next Gmail, a massive personal data store centered around search. How do you do it? An obvious answer is to just use a regular search engine, write all documents to a big index and simply restrict queries to match documents belonging to a single user.

This works, but the problem is cost. Successful personal data stores has a tendency to become massive — the amount of personal data produced in the world outweighs public data by many orders of magnitude. Storing indexes in addition to raw data means paying for extra disk space for all this data and paying for the overhead of updating this massive index each time a user changes or adds data. Index updates are costly, especially when they need to be handled in real time, which users often expect for their own data. Systems like Gmail handle billions of writes per day so this quickly becomes the dominating cost of the entire system.

However, when you think about it there’s really no need to go through the trouble of maintaining global indexes when each user only searches her own data. What if we just maintain a separate small index per user? This makes both index updates and queries cheaper, but leads to a new problem: Writes will arrive randomly over all users, which means we’ll need to read and write a user’s index on every update without help from caching. A billion writes per day translates to about 25k read-and write operations per second peak. Handling traffic at that scale either means using a few thousand spinning disks, or storing all data on SSD’s. Both options are expensive.

Large scale data stores already solve this problem for appending writes, by using some variant of multilevel log storage. Could we leverage this to layer the index on top of a data store like that? That helps, but means we need to do our own development to put these systems together in a way that performs at scale every time for both queries and writes. And we still pay the cost of storing the indexes in addition to the raw user data.

Do we need indexes at all though? With some reflection, it turns out that we don’t. Indexes consists of pointers from words/tokens to the documents containing them. This allows us to find those documents faster than would be possible if we had to read the content of the documents to find the right ones, of course at the considerable cost of maintaining those indexes. In personal search however, any query only accesses a small subset of the data, and the subsets are know in advance. If we take care to store the data of each subset together we can achieve search with low latency by simply reading the data at query time — what we call streaming search. In most cases, most subsets of data (i.e most users) are so small that this can be done serially on a single node. Subsets of data that are too large to stream quickly on a single node can be split over multiple nodes streaming in parallel.

Numbers

How many documents can be searched per node per second with this solution? Assuming a node with 500 Mb/sec read speed (either from an SSD or multiple spinning disks), and 1k average compressed document size, the disk can search max 500Mb/sec / 1k/doc = 500,000 docs/sec. If each user store 1000 documents each on average this gives a max throughput per node of 500 queries/second. This is not an exact computation since we disregard time used to seek and write, and inefficiency from reading non-compacted data on one hand, and assume an overly pessimistic zero effect from caching on the other, but it is a good indication that our solution is cost effective.

What about latency? From the calculation above we see that the latency from finding the matching documents will be 2 ms on average. However, we usually care more about the 99% latency (or similar). This will be driven by large users which needs to be split among multiple nodes streaming in parallel. The max data size per node is then a tradeoff between latency for such users and the overall cost of executing their queries (less nodes per query is cheaper). For example, we can choose to store max 50.000 documents per user per node such that we get a max latency of 100 ms per query. Lastly, the total number of nodes decides the max parallelism and hence latency for the very largest users. For example, with 20 nodes in total a cluster we can support 20 * 50k = 1 million documents for a single user with 100 ms latency.

All right — with this we have our cost-effective solution to implement the next Gmail: Store just the raw data of users, in a log-level store. Locate the data of each user on a single node in the system for locality (or, really 2–3 nodes for redundancy), but split over multiple nodes for users that grow large. Implement a fully functional search and relevance engine on top of the raw data store, which distributes queries to the right set of nodes for each user and merges the results. This will be cheap and efficient, but it sounds like a lot of work! It sure would be nice if somebody already did all of it, ran it at large scale for years and then released it as open source.

Well, as luck would have it we already did this in Vespa. In addition to the standard indexing mode, Vespa includes a streaming mode for documents which provides this solution, implemented by layering the full search engine functionality over the raw data store built into Vespa. When this solution is compared to indexed search in Vespa or more complicated sharding solutions in Elastic Search for personal search applications, we typically see about an order of magnitude reduction in cost of achieving a system which can sustain the query and update rates needed by the application with stable latencies over long time periods. It has been used to implement various applications such as storing and searching massive amounts of mails, personal typeahead suggestions, personal image collections, and private forum group content.

Using streaming search on Vespa

The steps to using streaming search on Vespa are:

  • Set streaming mode for the document type(s) in question in services.xml.
  • Write documents with a group name (e.g a user id) in their id, by setting g=[groupid] in the third part of the document id, as in e.g id:mynamespace:mydocumenttype:g=user123:doc123
  • Pass the group id in queries by setting the query property streaming.groupname in queries.

That’s it! With those steps you have created a scalable, battle-proven personal search solution which is an order of magnitude cheaper than any alternative out there, with full support for structured and text search, advanced relevance including natural language and machine-learned models, and powerful grouping and aggregation for features like faceting. For more details see the documentation on streaming search. Have fun with it, and as usual let us know what you are building!