Hands-On RAG guide for personal data with Vespa and LLamaIndex

This blog post is a hands-on RAG tutorial demonstrating how to use Vespa streaming mode for cost-efficient retrieval of personal data. You can read more about Vespa streaming search in these two blog posts:

This blog post is also available as a runnable notebook where you can have this app up and running on
Vespa Cloud in minutes
(
Open In Colab
)

The blog post covers:

  • Configuring Vespa and using Vespa streaming mode with PyVespa.
  • Using Vespa native built-in embedders in combination with streaming mode.
  • Ranking in Vespa, including hybrid retrieval and ranking methods, freshness (recency) features, and Vespa Rank Fusion.
  • Query federation and blending retrieved results from multiple sources/schemas.
  • Connecting LLamaIndex retrievers with a Vespa app to build generative AI pipelines.

TLDR; Vespa streaming mode

Vespa’s streaming search solution lets you make the user a part of the document ID so that Vespa can use it to co-locate the data of each user on a small set of nodes and the same chunk of disk.
Streaming mode allows searching over a user’s data with low latency without keeping any user’s data in memory or paying the cost of managing indexes.

  • There is no accuracy drop for vector search as it uses exact vector search
  • Several orders of magnitude higher write throughput (No expensive index builds to support approximate search)
  • Documents (including vector data) are 100% disk-based, significantly reducing deployment cost
  • Queries are restricted to content by the user ID/(groupname)

Storage cost is the primary cost driver of Vespa streaming mode; no data is in memory. Avoiding memory usage lowers deployment costs significantly.
For example, Vespa Cloud allows storing streaming mode data at below 0.30$ per GB/month. Yes, that is per month.

Getting started with LLamaIndex and PyVespa

The focus is on using the streaming mode feature in combination with multiple Vespa schemas; in our case,
we imagine building RAG over personal mail and calendar data, allowing effortless query federation and blending
of the results from multiple data sources for a given user.

First, we must install dependencies:

! pip3 install pyvespa llama-index

Synthetic Mail & Calendar Data

There are few public email datasets because people care about their privacy, so this notebook uses synthetic data to examine how to use Vespa streaming mode.
We create two generator functions that return Python dicts with synthetic mail and calendar data.

Notice that the dict has three keys:

This is the expected feed format for PyVespa feed operations and
where PyVespa will use these to build a Vespa document v1 API request(s).
The groupname key is only relevant with streaming mode.

mail

from typing import List

def synthetic_mail_data_generator() -> List[dict]:
    synthetic_mails = [
        {
            "id": 1,
            "groupname": "[email protected]",
            "fields": {
                "subject": "LlamaIndex news, 2023-11-14",
                "to": "[email protected]",
                "body": """Hello Llama Friends 🦙 LlamaIndex is 1 year old this week! 🎉 To celebrate, we're taking a stroll down memory 
                    lane on our blog with twelve milestones from our first year. Be sure to check it out.""",
                "from": "[email protected]",
                "display_date": "2023-11-15T09:00:00Z"
            }
        },
        {
            "id": 2,
            "groupname": "[email protected]",
            "fields": {
                "subject": "Dentist Appointment Reminder",
                "to": "[email protected]",
                "body": "Dear Jo Kristian ,\nThis is a reminder for your upcoming dentist appointment on 2023-12-04 at 09:30. Please arrive 15 minutes early.\nBest regards,\nDr. Dentist",
                "from": "[email protected]",
                "display_date": "2023-11-15T15:30:00Z"
            }
        },
        {
            "id": 1,
            "groupname": "[email protected]",
            "fields": {
                "subject": "Wildlife Update: Giraffe Edition",
                "to": "[email protected]",
                "body": "Dear Wildlife Enthusiasts 🦒, We're thrilled to share the latest insights into giraffe behavior in the wild. Join us on an adventure as we explore their natural habitat and learn more about these majestic creatures.",
                "from": "[email protected]",
                "display_date": "2023-11-12T14:30:00Z"
        }
        },
        {
            "id": 1,
            "groupname": "[email protected]",
            "fields": {
                "subject": "Antarctica Expedition: Penguin Chronicles",
                "to": "[email protected]",
                "body": "Greetings Explorers 🐧, Our team is embarking on an exciting expedition to Antarctica to study penguin colonies. Stay tuned for live updates and behind-the-scenes footage as we dive into the world of these fascinating birds.",
                "from": "[email protected]",
                "display_date": "2023-11-11T11:45:00Z"
            }
        },
        {
            "id": 1,
            "groupname": "[email protected]",
            "fields": {
                "subject": "Space Exploration News: November Edition",
                "to": "[email protected]",
                "body": "Hello Space Enthusiasts 🚀, Join us as we highlight the latest discoveries and breakthroughs in space exploration. From distant galaxies to new technologies, there's a lot to explore!",
                "from": "[email protected]",
                "display_date": "2023-11-01T16:20:00Z"
            }
         },
        {
            "id": 1,
            "groupname": "[email protected]",
            "fields": {
                "subject": "Ocean Discovery: Hidden Treasures Unveiled",
                "to": "[email protected]",
                "body": "Dear Ocean Explorers 🌊, Dive deep into the secrets of the ocean with our latest discoveries. From undiscovered species to underwater landscapes, our team is uncovering the wonders of the deep blue.",
                "from": "[email protected]",
                "display_date": "2023-10-01T10:15:00Z"
            }
        }
    ]
    for mail in synthetic_mails:
        yield mail  

calendar

Similarily, for calendar data

from typing import List

def synthetic_calendar_data_generator() -> List[dict]:
    calendar_data = [
       
        {
            "id": 1,
            "groupname": "[email protected]",
            "fields": {
                "subject": "Dentist Appointment",
                "to": "[email protected]",
                "body": "Dentist appointment at 2023-12-04 at 09:30 - 1 hour duration",
                "from": "[email protected]",
                "display_date": "2023-11-15T15:30:00Z",
                "duration": 60,
            }
        },
         {
            "id": 2,
            "groupname": "[email protected]",
            "fields": {
                "subject": "Public Cloud Platform Events",
                "to": "[email protected]",
                "body": "The cloud team continues to push new features and improvements to the platform. Join us for a live demo of the latest updates",
                "from": "public-cloud-platform-events",
                "display_date": "2023-11-21T09:30:00Z",
                "duration": 60,
            }
        }
    ]
    for event in calendar_data:
        yield event

Definining a Vespa application

PyVespa helps us build the Vespa application package.
A Vespa application package comprises configuration files, code (plugins), and models.

We define two Vespa schemas for our mail and calendar data. PyVespa
offers a programmatic API for creating the schema. Ultimately, the programmatic representation is serialized to files (<schema-name>.sd).

In the following we define the fields and their type. Note that we set mode to streaming,
which enables Vespa streaming mode for this schema.
Other valid modes are indexed and store-only.

mail schema

from vespa.package import Schema, Document, Field, FieldSet, HNSW
mail_schema = Schema(
            name="mail",
            mode="streaming",
            document=Document(
                fields=[
                    Field(name="id", type="string", indexing=["summary", "index"]),
                    Field(name="subject", type="string", indexing=["summary", "index"]),
                    Field(name="to", type="string", indexing=["summary", "index"]),
                    Field(name="from", type="string", indexing=["summary", "index"]),
                    Field(name="body", type="string", indexing=["summary", "index"]),
                    Field(name="display_date", type="string", indexing=["summary"]),
                    Field(name="timestamp", type="long", indexing=["input display_date", "to_epoch_second", "summary", "attribute"], is_document_field=False),
                    Field(name="embedding", type="tensor<bfloat16>(x[384])",
                        indexing=["\"passage: \" . input subject .\" \". input body", "embed e5", "attribute", "index"],
                        ann=HNSW(distance_metric="angular"),
                        is_document_field=False
                    )
                ],
            ),
            fieldsets=[
                FieldSet(name = "default", fields = ["subject", "body", "to", "from"])
            ]
)

In the mail schema, we have six document fields; these are provided by us when we feed documents of type mail to this app.
The fieldset defines
which fields are matched against when we do not mention explicit field names when querying. We can add as many fieldsets as we like without duplicating content.

In addition to the fields within the document, there are two synthetic fields in the schema, timestamp, and embedding,
using Vespa indexing expressions
taking inputs from the document and performing conversions.

  • the timestamp field takes the input display_date and uses the to_epoch_second converter converter to convert the
    display date into an epoch timestamp. This is useful because we can calculate the document’s age and use the freshness(timestamp) rank feature during ranking phases.
  • the embedding tensor field takes the subject and body as input. It feeds that into an embed function that uses an embedding model to map the string input into an embedding vector representation
    using 384-dimensions with bfloat16 precision. Vectors in Vespa are represented as Tensors.

calendar schema

from vespa.package import Schema, Document, Field, FieldSet, HNSW
calendar_schema = Schema(
            name="calendar",
            inherits="mail",
            mode="streaming",
            document=Document(inherits="mail",
                fields=[
                    Field(name="duration", type="int", indexing=["summary", "index"]),
                    Field(name="guests", type="array<string>", indexing=["summary", "index"]),
                    Field(name="location", type="string", indexing=["summary", "index"]),
                    Field(name="url", type="string", indexing=["summary", "index"]),
                    Field(name="address", type="string", indexing=["summary", "index"])
                ]
            )
)

The calendar schema inherits from the mail schema, meaning we don’t have to define the embedding field for the
calendar schema.

Configuring embedders

The observant reader might have noticed the e5 argument to the embed expression in the above mail schema embedding field.
The e5 argument references a component of the type hugging-face-embedder. In this
example, we use the e5-small-v2 text embedding model that maps text to 384-dimensional vectors.

from vespa.package import ApplicationPackage, Component, Parameter

vespa_app_name = "assistant"
vespa_application_package = ApplicationPackage(
        name=vespa_app_name,
        schema=[mail_schema, calendar_schema],
        components=[Component(id="e5", type="hugging-face-embedder",
            parameters=[
                Parameter("transformer-model", {"url": "https://github.com/vespa-engine/sample-apps/raw/master/simple-semantic-search/model/e5-small-v2-int8.onnx"}),
                Parameter("tokenizer-model", {"url": "https://raw.githubusercontent.com/vespa-engine/sample-apps/master/simple-semantic-search/model/tokenizer.json"})
            ]
        )]
) 

We share and reuse the same embedding model for both schemas. Note that embedding inference is resource-intensive.

Ranking

In the last step of configuring the Vespa app, we add ranking profiles by adding rank-profile’s to the schemas. Vespa supports phased ranking and has a rich set of built-in rank-features.

One can also define custom functions with ranking expressions.

from vespa.package import RankProfile, Function, GlobalPhaseRanking, FirstPhaseRanking

keywords_and_freshness = RankProfile(
    name="default", 
    functions=[Function(
        name="my_function", expression="nativeRank(subject) + nativeRank(body) + freshness(timestamp)"
    )],
    first_phase=FirstPhaseRanking(
        expression="my_function",
        rank_score_drop_limit=0.02
    ),
    match_features=["nativeRank(subject)", "nativeRank(body)", "my_function", "freshness(timestamp)"],
)

semantic = RankProfile(
    name="semantic", 
    functions=[Function(
        name="cosine", expression="max(0,cos(distance(field, embedding)))"
    )],
    inputs=[("query(q)", "tensor<float>(x[384])"), ("query(threshold)","", "0.75")],
    first_phase=FirstPhaseRanking(
        expression="if(cosine > query(threshold), cosine, -1)",
        rank_score_drop_limit=0.1
    ),
    match_features=["cosine", "freshness(timestamp)", "distance(field, embedding)", "query(threshold)"],
)

fusion = RankProfile(
    name="fusion",
    inherits="semantic",
    functions=[
        Function(
            name="keywords_and_freshness", expression=" nativeRank(subject) + nativeRank(body) + freshness(timestamp)"
        ),
        Function(
            name="semantic", expression="cos(distance(field,embedding))"
        )

    ],
    inputs=[("query(q)", "tensor<float>(x[384])"), ("query(threshold)", "", "0.75")],
    first_phase=FirstPhaseRanking(
        expression="if(cosine > query(threshold), cosine, -1)"

Open Sourcing Vespa, Yahoo’s Big Data Processing and Serving Engine

By Jon Bratseth, Distinguished Architect, Vespa

Ever since we open sourced Hadoop in 2006, Yahoo – and now, Oath – has been committed to opening up its big data infrastructure to the larger developer community. Today, we are taking another major step in this direction by making Vespa, Yahoo’s big data processing and serving engine, available as open source on GitHub.

Building applications increasingly means dealing with huge amounts of data. While developers can use the Hadoop stack to store and batch process big data, and Storm to stream-process data, these technologies do not help with serving results to end users. Serving is challenging at large scale, especially when it is necessary to make computations quickly over data while a user is waiting, as with applications that feature search, recommendation, and personalization.

By releasing Vespa, we are making it easy for anyone to build applications that can compute responses to user requests, over large datasets, at real time and at internet scale – capabilities that up until now, have been within reach of only a few large companies.

Serving often involves more than looking up items by ID or computing a few numbers from a model. Many applications need to compute over large datasets at serving time. Two well-known examples are search and recommendation. To deliver a search result or a list of recommended articles to a user, you need to find all the items matching the query, determine how good each item is for the particular request using a relevance/recommendation model, organize the matches to remove duplicates, add navigation aids, and then return a response to the user. As these computations depend on features of the request, such as the user’s query or interests, it won’t do to compute the result upfront. It must be done at serving time, and since a user is waiting, it has to be done fast. Combining speedy completion of the aforementioned operations with the ability to perform them over large amounts of data requires a lot of infrastructure – distributed algorithms, data distribution and management, efficient data structures and memory management, and more. This is what Vespa provides in a neatly-packaged and easy to use engine.

With over 1 billion users, we currently use Vespa across many different Oath brands – including Yahoo.com, Yahoo News, Yahoo Sports, Yahoo Finance, Yahoo Gemini, Flickr, and others – to process and serve billions of daily requests over billions of documents while responding to search queries, making recommendations, and providing personalized content and advertisements, to name just a few use cases. In fact, Vespa processes and serves content and ads almost 90,000 times every second with latencies in the tens of milliseconds. On Flickr alone, Vespa performs keyword and image searches on the scale of a few hundred queries per second on tens of billions of images. Additionally, Vespa makes direct contributions to our company’s revenue stream by serving over 3 billion native ad requests per day via Yahoo Gemini, at a peak of 140k requests per second (per Oath internal data).

With Vespa, our teams build applications that:

  • Select content items using SQL-like queries and text search
  • Organize all matches to generate data-driven pages
  • Rank matches by handwritten or machine-learned relevance models
  • Serve results with response times in the low milliseconds
  • Write data in real-time, thousands of times per second per node
  • Grow, shrink, and re-configure clusters while serving and writing data

To achieve both speed and scale, Vespa distributes data and computation over many machines without any single master as a bottleneck. Where conventional applications work by pulling data into a stateless tier for processing, Vespa instead pushes computations to the data. This involves managing clusters of nodes with background redistribution of data in case of machine failures or the addition of new capacity, implementing distributed low latency query and processing algorithms, handling distributed data consistency, and a lot more. It’s a ton of hard work!

As the team behind Vespa, we have been working on developing search and serving capabilities ever since building alltheweb.com, which was later acquired by Yahoo. Over the last couple of years we have rewritten most of the engine from scratch to incorporate our experience onto a modern technology stack. Vespa is larger in scope and lines of code than any open source project we’ve ever released. Now that this has been battle-proven on Yahoo’s largest and most critical systems, we are pleased to release it to the world.

Vespa gives application developers the ability to feed data and models of any size to the serving system and make the final computations at request time. This often produces a better user experience at lower cost (for buying and running hardware) and complexity compared to pre-computing answers to requests. Furthermore it allows developers to work in a more interactive way where they navigate and interact with complex calculations in real time, rather than having to start offline jobs and check the results later.

Vespa can be run on premises or in the cloud. We provide both Docker images and rpm packages for Vespa, as well as guides for running them both on your own laptop or as an AWS cluster.

We’ll follow up this initial announcement with a series of posts on our blog showing how to build a real-world application with Vespa, but you can get started right now by following the getting started guide in our comprehensive documentation.

Managing distributed systems is not easy. We have worked hard to make it easy to develop and operate applications on Vespa so that you can focus on creating features that make use of the ability to compute over large datasets in real time, rather than the details of managing clusters and data. You should be able to get an application up and running in less than ten minutes by following the documentation.

We can’t wait to see what you’ll build with it!

Join us at the Big Data Technology Warsaw Summit on February 27th for Scalable Machine-Learned Model Serving

Online evaluation of machine-learned models (model serving) is difficult to scale to large datasets. Vespa.ai is an open source big data serving solution used to solve this problem and in use today on some of the largest such systems in the world. These systems evaluate models over millions of data points per request for hundreds of thousands of requests per second.

If you’re in Warsaw on February 27th, please join Jon Bratseth (Distinguished Architect, Verizon Media) at the Big Data Technology Warsaw Summit, where he’ll share “Scalable machine-learned model serving” and answer any questions. Big Data Technology Warsaw Summit is a one-day conference with technical content focused on big data analysis, scalability, storage, and search. There will be 27 presentations and more than 500 attendees are expected.

Jon’s talk will explore the problem and architectural solution, show how Vespa can be used to achieve scalable serving of TensorFlow and ONNX models, and present benchmarks comparing performance and scalability to TensorFlow Serving.

Hope to see you there!

The big data maturity levels

By now it’s well known that making effective use of data is a competitive advantage. But how advanced is your
organization at making use of data?

Over the last few years I have spoken to many organizations on this topic.
One thing I’ve learned is that all of them go through the same learning process in putting their data to work.
From Silicon Valley giants to industry companies in Asia and government entities in Europe, all go through the
same main evolutionary stages. I call these the big data maturity levels.

The big data maturity levels

Level 0: Latent

Data is produced by the normal course of operations of the organization, but is not systematically used to
make decisions.

Example: A movie streaming service is logging each movie viewing event with information about what is viewed,
and by whom.

Level 1: Analysis

Data is used by humans to make decisions. Analysts extract information from the data, such as graphs and figures
showing statistics, which is used by humans to inform their decision making.

Example: A movie streaming service uses logs to produce lists of the most viewed movies broken down by user
attributes. Editors use these to create curated movie recommendations to important segments of users.

Relevant technologies at this level include traditional data warehouses, data analytics platforms such as
Splunk and Elastic Search, and big data query engines
such as Spark.

Level 2: Learning

Data is used to learn and compute the decisions that will be needed to achieve a given objective. Rather
than making each decision directly from the data, humans take a step back from the details of the data and
instead formulate objectives and set up a situation where the system can learn the decisions that achieve
them directly from the data. The offline system both learn which decisions to make and computes the right
decisions for use in the future.

Example: A movie streaming service uses machine learning to periodically compute lists of movie
recommendations for each user segment.

Relevant technologies at this level include machine learning tools such as
TensorFlow and PyTorch,
machine learning platforms such as Michelangelo,
and tooling for offline processing and machine learning at scale such as Hadoop.

Level 3: Acting

Data is used to make decisions in real time. Rather than pre-computing decisions offline, decisions are
made at the moment they are needed. This makes it possible to take all relevant information into account
and base decisions on up-to-date information about the world.

Example: A movie streaming service computes recommended movies for each particular user at the point when
they access the service.

Relevant technologies: Some times it is possible to make decisions by considering a single data point.
In those cases model serving tools such as TensorFlow Serving,
or stream processing tools such as
Storm
and Flink may be used. In general — as in the movie streaming example –
multiple data items are needed to
make each decision, which can is achieved using a big data serving engine such as
Vespa.

Conclusion

Taking a step back and reflecting on the maturity level of your organization (or team — organizations
don’t always evolve in synchronicity) can be helpful in understanding the current type of challenges you
face, what kinds of technologies you should consider, and what’s needed to move to the next level in
your organization.

I hope this post has been helpful in this — it’s the first post in a series exploring this topic. In
the next posts, I’ll take a look at the forces that pushes the worlds most advanced organizations to
move to maturity level 3, the benefits they see from making this move, and why this has traditionally
been so hard to pull off.

Doubling the throughput of data redistribution

Geir Storli

Geir Storli

Senior Principal Vespa Engineer

Tor Brede Vekterli

Tor Brede Vekterli

Senior Principal Vespa Engineer


Photo by Andy Holmes
on Unsplash

Vespa automatically keeps data distributed over content nodes and
redistributes data in the background when nodes are added or removed – examples are node failure,
optimized node configuration or cluster growth.

In the past months we have worked on improving the performance of this data redistribution.
We have been able to double the throughput, cutting the time it takes to replace a failing content node in half.

In this blog post we give an overview of which improvements we made,
with some performance numbers from a Vespa application.
All these improvements are part of Vespa 7.528.3.

Introduction

Data in Vespa is modeled as documents.
The document space is split into logical chunks called buckets,
and each document is mapped to a single bucket based on its document id.
Buckets are automatically distributed over available nodes in a content cluster using a configured redundancy level.
Nodes can be added to
or removed from the content cluster at any time,
and Vespa will redistribute data in the background
with minimal impact to query or write traffic.

No explicit sharding or manual decision making is needed. In the case of a failed node,
data redistribution is automatically run to rebuild the configured redundancy level of the data among the remaining nodes.
When retiring a node, its replicas are gradually moved to the remaining nodes.
It eventually ends up storing no data and receiving no traffic.
At this point the node can be safely removed from the cluster.

Data redistribution is an integral part of the automatic node management provided in Vespa Cloud.
This includes the detection and replacement of nodes with hardware issues, upgrading OS version,
increasing or decreasing content cluster capacity,
and the optimization of node allocation using autoscaling.

Data redistribution and improvements

Data redistribution in a content cluster is handled by a set of maintenance operations.
The most important ones are merge bucket and delete bucket.

If bucket replicas across content nodes do not store the same document versions,
they are said to be out of sync. When this happens, a merge bucket operation is executed to re-sync the bucket replicas.
This operation is scheduled by the distributor
and sent to the content nodes (proton) for processing.
It is a complex operation, involving multiple processes and threads, and bottlenecks were discovered in several components.

A delete bucket operation removes a bucket replica from a node.
It is primarily used to remove leftover data on a node after its replica contents have been copied to another node.
Removing a document requires it to be removed from all the field indexes and attributes it is part of.
This is similar in performance cost to inserting and indexing the document during feeding.
When deleting a replica, many documents are removed in one go.
This makes the delete bucket operation costly compared to client operations as put, update and remove,
and handling it must be as fast as possible to avoid latency spikes.

The following list summarizes on a high level which improvements were made to remove the bottlenecks found,
avoid latency spikes and improve the throughput of data redistribution.

Distributor:

  • Enhanced the maintenance operation scheduling semantics to avoid potential head-of-line blocking of later buckets
    in the priority database.
  • Removed distribution key
    ordering requirements for merge chains to avoid load skew to content nodes with low distribution keys.

Proton:

  • Made all operations related to data redistribution async to allow for better throughput.
  • Prioritized merge bucket operations from content nodes (proton)
    higher than merge operations from distributor nodes,
    never rejecting them due to merge queue limits.
  • Optimized delete bucket handling in document meta store,
    and for index and attribute fields.
  • Moved extraction of document field values to index and attribute writer threads.
  • Removed stop-the-world syncing of executor thread pools used in the feed pipeline.
  • Changed the executor thread implementation used in the feed pipeline to one that is optimized for throughput instead of latency.
  • Changed writing of index fields to use the same executor thread pool for both
    inverting fields and pushing the result to the memory index.

Performance

Several Vespa applications running in the Vespa Cloud
were analyzed to pinpoint bottlenecks and to measure the effectiveness of the improvements made.

One of the applications uses a grouped content distribution
with 3 groups of 4 content nodes each. The redundancy inside each group is 1,
meaning each document is 3-way replicated across the entire cluster.
Each content node has 72 vcpus, 128 GB memory, 400 GB disk,
and contains 230 million documents with a total size of 311 GB.
When replacing a content node, data is redistributed across the three other nodes in the group and the replacement node.
The bottleneck in this process is redistributing the data to the replacement node.

The baseline throughput was an average of 22.5 MB/sec of data to the replacement node. The whole process took 3:50 hours.
With all improvements in place we achieved an average of 44 MB/sec, taking close to 2 hours.
The throughput of data redistribution was almost doubled. Similar improvements were observed for other applications.

Summary

In this blog post we have looked at which improvements we made to double the throughput of data redistribution in Vespa
with minimal impact to query or write traffic. From an operational point of view this is important,
as the time it takes from a failing node is discovered until the redundancy level is re-established is cut in half.
All these improvements are part of Vespa 7.528.3.