Skip to content

Commit 210296a

Browse files
gadhagodbaskaryan
andauthored
Integrate Rockset as a document loader (#7681)
<!-- Thank you for contributing to LangChain! Replace this comment with: - Description: a description of the change, - Issue: the issue # it fixes (if applicable), - Dependencies: any dependencies required for this change, - Tag maintainer: for a quicker response, tag the relevant maintainer (see below), - Twitter handle: we announce bigger features on Twitter. If your PR gets announced and you'd like a mention, we'll gladly shout you out! If you're adding a new integration, please include: 1. a test for the integration, preferably unit tests that do not rely on network access, 2. an example notebook showing its use. Maintainer responsibilities: - General / Misc / if you don't know who to tag: @baskaryan - DataLoaders / VectorStores / Retrievers: @rlancemartin, @eyurtsev - Models / Prompts: @hwchase17, @baskaryan - Memory: @hwchase17 - Agents / Tools / Toolkits: @hinthornw - Tracing / Callbacks: @agola11 - Async: @agola11 If no one reviews your PR within a few days, feel free to @-mention the same people again. See contribution guidelines for more information on how to write/run tests, lint, etc: https://github.com/hwchase17/langchain/blob/master/.github/CONTRIBUTING.md --> Integrate [Rockset](https://rockset.com/docs/) as a document loader. Issue: None Dependencies: Nothing new (rockset's dependency was already added [here](#6216)) Tag maintainer: @rlancemartin I have added a test for the integration and an example notebook showing its use. I ran `make lint` and everything looks good. --------- Co-authored-by: Bagatur <[email protected]>
1 parent ad7d976 commit 210296a

File tree

5 files changed

+435
-0
lines changed

5 files changed

+435
-0
lines changed

docs/extras/ecosystem/integrations/rockset.mdx

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,3 +17,10 @@ See a [usage example](/docs/modules/data_connection/vectorstores/integrations/ro
1717
```python
1818
from langchain.vectorstores import RocksetDB
1919
```
20+
21+
## Document Loader
22+
23+
See a [usage example](docs/modules/data_connection/document_loaders/integrations/rockset).
24+
```python
25+
from langchain.document_loaders import RocksetLoader
26+
```
Lines changed: 251 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,251 @@
1+
{
2+
"cells": [
3+
{
4+
"attachments": {},
5+
"cell_type": "markdown",
6+
"metadata": {},
7+
"source": [
8+
"# Rockset\n",
9+
"\n",
10+
"> Rockset is a real-time analytics database which enables queries on massive, semi-structured data without operational burden. With Rockset, ingested data is queryable within one second and analytical queries against that data typically execute in milliseconds. Rockset is compute optimized, making it suitable for serving high concurrency applications in the sub-100TB range (or larger than 100s of TBs with rollups).\n",
11+
"\n",
12+
"This notebook demonstrates how to use Rockset as a document loader in langchain. To get started, make sure you have a Rockset account and an API key available.\n",
13+
"\n",
14+
"\n"
15+
]
16+
},
17+
{
18+
"attachments": {},
19+
"cell_type": "markdown",
20+
"metadata": {},
21+
"source": [
22+
"## Setting up the environment\n",
23+
"\n",
24+
"1. Go to the [Rockset console](https://console.rockset.com/apikeys) and get an API key. Find your API region from the [API reference](https://rockset.com/docs/rest-api/#introduction). For the purpose of this notebook, we will assume you're using Rockset from `Oregon(us-west-2)`.\n",
25+
"2. Set your the environment variable `ROCKSET_API_KEY`.\n",
26+
"3. Install the Rockset python client, which will be used by langchain to interact with the Rockset database."
27+
]
28+
},
29+
{
30+
"cell_type": "code",
31+
"execution_count": null,
32+
"metadata": {
33+
"vscode": {
34+
"languageId": "shellscript"
35+
}
36+
},
37+
"outputs": [],
38+
"source": [
39+
"$ pip3 install rockset"
40+
]
41+
},
42+
{
43+
"attachments": {},
44+
"cell_type": "markdown",
45+
"metadata": {},
46+
"source": [
47+
"# Loading Documents\n",
48+
"The Rockset integration with LangChain allows you to load documents from Rockset collections with SQL queries. In order to do this you must construct a `RocksetLoader` object. Here is an example snippet that initializes a `RocksetLoader`."
49+
]
50+
},
51+
{
52+
"cell_type": "code",
53+
"execution_count": null,
54+
"metadata": {},
55+
"outputs": [],
56+
"source": [
57+
"from langchain.document_loaders import RocksetLoader\n",
58+
"from rockset import RocksetClient, Regions, models\n",
59+
"\n",
60+
"loader = RocksetLoader(\n",
61+
" RocksetClient(Regions.usw2a1, \"<api key>\"),\n",
62+
" models.QueryRequestSql(query=\"SELECT * FROM langchain_demo LIMIT 3\"), # SQL query\n",
63+
" [\"text\"], # content columns\n",
64+
" metadata_keys=[\"id\", \"date\"], # metadata columns\n",
65+
")"
66+
]
67+
},
68+
{
69+
"attachments": {},
70+
"cell_type": "markdown",
71+
"metadata": {},
72+
"source": [
73+
"Here, you can see that the following query is run:\n",
74+
"\n",
75+
"```sql\n",
76+
"SELECT * FROM langchain_demo LIMIT 3\n",
77+
"```\n",
78+
"\n",
79+
"The `text` column in the collection is used as the page content, and the record's `id` and `date` columns are used as metadata (if you do not pass anything into `metadata_keys`, the whole Rockset document will be used as metadata). \n",
80+
"\n",
81+
"To execute the query and access an iterator over the resulting `Document`s, run:"
82+
]
83+
},
84+
{
85+
"cell_type": "code",
86+
"execution_count": null,
87+
"metadata": {},
88+
"outputs": [],
89+
"source": [
90+
"loader.lazy_load()"
91+
]
92+
},
93+
{
94+
"attachments": {},
95+
"cell_type": "markdown",
96+
"metadata": {},
97+
"source": [
98+
"To execute the query and access all resulting `Document`s at once, run:"
99+
]
100+
},
101+
{
102+
"cell_type": "code",
103+
"execution_count": null,
104+
"metadata": {},
105+
"outputs": [],
106+
"source": [
107+
"loader.load()"
108+
]
109+
},
110+
{
111+
"attachments": {},
112+
"cell_type": "markdown",
113+
"metadata": {},
114+
"source": [
115+
"Here is an example response of `loader.load()`:\n",
116+
"```python\n",
117+
"[\n",
118+
" Document(\n",
119+
" page_content=\"Lorem ipsum dolor sit amet, consectetur adipiscing elit. Maecenas a libero porta, dictum ipsum eget, hendrerit neque. Morbi blandit, ex ut suscipit viverra, enim velit tincidunt tellus, a tempor velit nunc et ex. Proin hendrerit odio nec convallis lobortis. Aenean in purus dolor. Vestibulum orci orci, laoreet eget magna in, commodo euismod justo.\", \n",
120+
" metadata={\"id\": 83209, \"date\": \"2022-11-13T18:26:45.000000Z\"}\n",
121+
" ),\n",
122+
" Document(\n",
123+
" page_content=\"Integer at finibus odio. Nam sit amet enim cursus lacus gravida feugiat vestibulum sed libero. Aenean eleifend est quis elementum tincidunt. Curabitur sit amet ornare erat. Nulla id dolor ut magna volutpat sodales fringilla vel ipsum. Donec ultricies, lacus sed fermentum dignissim, lorem elit aliquam ligula, sed suscipit sapien purus nec ligula.\", \n",
124+
" metadata={\"id\": 89313, \"date\": \"2022-11-13T18:28:53.000000Z\"}\n",
125+
" ),\n",
126+
" Document(\n",
127+
" page_content=\"Morbi tortor enim, commodo id efficitur vitae, fringilla nec mi. Nullam molestie faucibus aliquet. Praesent a est facilisis, condimentum justo sit amet, viverra erat. Fusce volutpat nisi vel purus blandit, et facilisis felis accumsan. Phasellus luctus ligula ultrices tellus tempor hendrerit. Donec at ultricies leo.\", \n",
128+
" metadata={\"id\": 87732, \"date\": \"2022-11-13T18:49:04.000000Z\"}\n",
129+
" )\n",
130+
"]\n",
131+
"```"
132+
]
133+
},
134+
{
135+
"attachments": {},
136+
"cell_type": "markdown",
137+
"metadata": {},
138+
"source": [
139+
"## Using multiple columns as content\n",
140+
"\n",
141+
"You can choose to use multiple columns as content:"
142+
]
143+
},
144+
{
145+
"cell_type": "code",
146+
"execution_count": null,
147+
"metadata": {},
148+
"outputs": [],
149+
"source": [
150+
"from langchain.document_loaders import RocksetLoader\n",
151+
"from rockset import RocksetClient, Regions, models\n",
152+
"\n",
153+
"loader = RocksetLoader(\n",
154+
" RocksetClient(Regions.usw2a1, \"<api key>\"),\n",
155+
" models.QueryRequestSql(query=\"SELECT * FROM langchain_demo LIMIT 1 WHERE id=38\"),\n",
156+
" [\"sentence1\", \"sentence2\"], # TWO content columns\n",
157+
")"
158+
]
159+
},
160+
{
161+
"attachments": {},
162+
"cell_type": "markdown",
163+
"metadata": {},
164+
"source": [
165+
"Assuming the \"sentence1\" field is `\"This is the first sentence.\"` and the \"sentence2\" field is `\"This is the second sentence.\"`, the `page_content` of the resulting `Document` would be:\n",
166+
"\n",
167+
"```\n",
168+
"This is the first sentence.\n",
169+
"This is the second sentence.\n",
170+
"```\n",
171+
"\n",
172+
"You can define you own function to join content columns by setting the `content_columns_joiner` argument in the `RocksetLoader` constructor. `content_columns_joiner` is a method that takes in a `List[Tuple[str, Any]]]` as an argument, representing a list of tuples of (column name, column value). By default, this is a method that joins each column value with a new line.\n",
173+
"\n",
174+
"For example, if you wanted to join sentence1 and sentence2 with a space instead of a new line, you could set `content_columns_joiner` like so:"
175+
]
176+
},
177+
{
178+
"cell_type": "code",
179+
"execution_count": null,
180+
"metadata": {},
181+
"outputs": [],
182+
"source": [
183+
"RocksetLoader(\n",
184+
" RocksetClient(Regions.usw2a1, \"<api key>\"),\n",
185+
" models.QueryRequestSql(query=\"SELECT * FROM langchain_demo LIMIT 1 WHERE id=38\"),\n",
186+
" [\"sentence1\", \"sentence2\"],\n",
187+
" content_columns_joiner=lambda docs: \" \".join(\n",
188+
" [doc[1] for doc in docs]\n",
189+
" ), # join with space instead of /n\n",
190+
")"
191+
]
192+
},
193+
{
194+
"attachments": {},
195+
"cell_type": "markdown",
196+
"metadata": {},
197+
"source": [
198+
"The `page_content` of the resulting `Document` would be:\n",
199+
"\n",
200+
"```\n",
201+
"This is the first sentence. This is the second sentence.\n",
202+
"```\n",
203+
"\n",
204+
"Oftentimes you want to include the column name in the `page_content`. You can do that like this:"
205+
]
206+
},
207+
{
208+
"cell_type": "code",
209+
"execution_count": null,
210+
"metadata": {},
211+
"outputs": [],
212+
"source": [
213+
"RocksetLoader(\n",
214+
" RocksetClient(Regions.usw2a1, \"<api key>\"),\n",
215+
" models.QueryRequestSql(query=\"SELECT * FROM langchain_demo LIMIT 1 WHERE id=38\"),\n",
216+
" [\"sentence1\", \"sentence2\"],\n",
217+
" content_columns_joiner=lambda docs: \"\\n\".join(\n",
218+
" [f\"{doc[0]}: {doc[1]}\" for doc in docs]\n",
219+
" ),\n",
220+
")"
221+
]
222+
},
223+
{
224+
"attachments": {},
225+
"cell_type": "markdown",
226+
"metadata": {},
227+
"source": [
228+
"This would result in the following `page_content`:\n",
229+
"\n",
230+
"```\n",
231+
"sentence1: This is the first sentence.\n",
232+
"sentence2: This is the second sentence.\n",
233+
"```"
234+
]
235+
}
236+
],
237+
"metadata": {
238+
"kernelspec": {
239+
"display_name": "env",
240+
"language": "python",
241+
"name": "python3"
242+
},
243+
"language_info": {
244+
"name": "python",
245+
"version": "3.11.4"
246+
},
247+
"orig_nbformat": 4
248+
},
249+
"nbformat": 4,
250+
"nbformat_minor": 2
251+
}

langchain/document_loaders/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,7 @@
104104
from langchain.document_loaders.recursive_url_loader import RecursiveUrlLoader
105105
from langchain.document_loaders.reddit import RedditPostsLoader
106106
from langchain.document_loaders.roam import RoamLoader
107+
from langchain.document_loaders.rocksetdb import RocksetLoader
107108
from langchain.document_loaders.rst import UnstructuredRSTLoader
108109
from langchain.document_loaders.rtf import UnstructuredRTFLoader
109110
from langchain.document_loaders.s3_directory import S3DirectoryLoader
@@ -248,6 +249,7 @@
248249
"RecursiveUrlLoader",
249250
"RedditPostsLoader",
250251
"RoamLoader",
252+
"RocksetLoader",
251253
"S3DirectoryLoader",
252254
"S3FileLoader",
253255
"SRTLoader",
Lines changed: 115 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,115 @@
1+
from typing import Any, Callable, Iterator, List, Optional, Tuple
2+
3+
from langchain.document_loaders.base import BaseLoader
4+
from langchain.schema import Document
5+
6+
7+
def default_joiner(docs: List[Tuple[str, Any]]) -> str:
8+
return "\n".join([doc[1] for doc in docs])
9+
10+
11+
class ColumnNotFoundError(Exception):
12+
def __init__(self, missing_key: str, query: str):
13+
super().__init__(f'Column "{missing_key}" not selected in query:\n{query}')
14+
15+
16+
class RocksetLoader(BaseLoader):
17+
"""Wrapper around Rockset db
18+
19+
To use, you should have the `rockset` python package installed.
20+
21+
Example:
22+
.. code-block:: python
23+
24+
# This code will load 3 records from the "langchain_demo"
25+
# collection as Documents, with the `text` column used as
26+
# the content
27+
28+
from langchain.document_loaders import RocksetLoader
29+
from rockset import RocksetClient, Regions, models
30+
31+
loader = RocksetLoader(
32+
RocksetClient(Regions.usw2a1, "<api key>"),
33+
models.QueryRequestSql(
34+
query="select * from langchain_demo limit 3"
35+
),
36+
["text"]
37+
)
38+
)
39+
"""
40+
41+
def __init__(
42+
self,
43+
client: Any,
44+
query: Any,
45+
content_keys: List[str],
46+
metadata_keys: Optional[List[str]] = None,
47+
content_columns_joiner: Callable[[List[Tuple[str, Any]]], str] = default_joiner,
48+
):
49+
"""Initialize with Rockset client.
50+
51+
Args:
52+
client: Rockset client object.
53+
query: Rockset query object.
54+
content_keys: The collection columns to be written into the `page_content`
55+
of the Documents.
56+
metadata_keys: The collection columns to be written into the `metadata` of
57+
the Documents. By default, this is all the keys in the document.
58+
content_columns_joiner: Method that joins content_keys and its values into a
59+
string. It's method that takes in a List[Tuple[str, Any]]],
60+
representing a list of tuples of (column name, column value).
61+
By default, this is a method that joins each column value with a new
62+
line. This method is only relevant if there are multiple content_keys.
63+
"""
64+
try:
65+
from rockset import QueryPaginator, RocksetClient
66+
from rockset.models import QueryRequestSql
67+
except ImportError:
68+
raise ImportError(
69+
"Could not import rockset client python package. "
70+
"Please install it with `pip install rockset`."
71+
)
72+
73+
if not isinstance(client, RocksetClient):
74+
raise ValueError(
75+
f"client should be an instance of rockset.RocksetClient, "
76+
f"got {type(client)}"
77+
)
78+
79+
if not isinstance(query, QueryRequestSql):
80+
raise ValueError(
81+
f"query should be an instance of rockset.model.QueryRequestSql, "
82+
f"got {type(query)}"
83+
)
84+
85+
self.client = client
86+
self.query = query
87+
self.content_keys = content_keys
88+
self.content_columns_joiner = content_columns_joiner
89+
self.metadata_keys = metadata_keys
90+
self.paginator = QueryPaginator
91+
self.request_model = QueryRequestSql
92+
93+
def load(self) -> List[Document]:
94+
return list(self.lazy_load())
95+
96+
def lazy_load(self) -> Iterator[Document]:
97+
query_results = self.client.Queries.query(
98+
sql=self.query
99+
).results # execute the SQL query
100+
for doc in query_results: # for each doc in the response
101+
try:
102+
yield Document(
103+
page_content=self.content_columns_joiner(
104+
[(col, doc[col]) for col in self.content_keys]
105+
),
106+
metadata={col: doc[col] for col in self.metadata_keys}
107+
if self.metadata_keys is not None
108+
else doc,
109+
) # try to yield the Document
110+
except (
111+
KeyError
112+
) as e: # either content_columns or metadata_columns is invalid
113+
raise ColumnNotFoundError(
114+
e.args[0], self.query
115+
) # raise that the column isn't in the db schema

0 commit comments

Comments
 (0)