Skip to content

Commit cbb688f

Browse files
committed
feat: add ai.set_scheduling function to change scheduling
The ai.set_scheduling function allows users to change the scheduling of their vectorizers. This is important when moving from self hosted with ai.scheduling_none, to TigerData cloud and ai.scheduling_timescaledb.
1 parent 30e06eb commit cbb688f

File tree

3 files changed

+311
-9
lines changed

3 files changed

+311
-9
lines changed

projects/pgai/db/sql/idempotent/012-vectorizer-api.sql

Lines changed: 73 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,7 @@ begin
5151
if embedding is null then
5252
raise exception 'embedding configuration is required';
5353
end if;
54-
54+
5555
if loading is null then
5656
raise exception 'loading configuration is required';
5757
end if;
@@ -173,7 +173,7 @@ begin
173173
raise notice 'a vectorizer named % already exists, skipping', name;
174174
return _existing_vectorizer_id;
175175
end if;
176-
176+
177177
-- validate the destination can create objects after the if_not_exists check
178178
perform ai._validate_destination_can_create_objects(destination);
179179

@@ -751,4 +751,74 @@ as $func$
751751
where v.name operator(pg_catalog.=) vectorizer_embed.name
752752
;
753753
$func$ language sql stable security invoker
754-
set search_path to pg_catalog, pg_temp;
754+
set search_path to pg_catalog, pg_temp;
755+
756+
757+
-------------------------------------------------------------------------------
758+
-- set_scheduling_timescaledb
759+
create or replace function ai.set_scheduling
760+
( vectorizer_id pg_catalog.int4
761+
, scheduling pg_catalog.jsonb default ai.scheduling_default()
762+
, indexing pg_catalog.jsonb default ai.indexing_default()
763+
) returns pg_catalog.jsonb
764+
as $func$
765+
declare
766+
_job_id pg_catalog.int8;
767+
_updated_config pg_catalog.jsonb;
768+
begin
769+
-- if ai.indexing_default, resolve the default
770+
if indexing operator(pg_catalog.->>) 'implementation' = 'default' then
771+
indexing = ai._resolve_indexing_default();
772+
end if;
773+
774+
-- validate the indexing config
775+
perform ai._validate_indexing(indexing);
776+
777+
-- if ai.scheduling_default, resolve the default
778+
if scheduling operator(pg_catalog.->>) 'implementation' = 'default' then
779+
scheduling = ai._resolve_scheduling_default();
780+
end if;
781+
782+
-- validate the scheduling config
783+
perform ai._validate_scheduling(scheduling);
784+
785+
-- if scheduling is none then indexing must also be none
786+
if scheduling operator(pg_catalog.->>) 'implementation' = 'none'
787+
and indexing operator(pg_catalog.->>) 'implementation' != 'none' then
788+
raise exception 'automatic indexing is not supported without scheduling. set indexing=>ai.indexing_none() when scheduling=>ai.scheduling_none()';
789+
end if;
790+
791+
-- delete current job if it exists
792+
PERFORM public.delete_job(job_id::pg_catalog.int4)
793+
FROM (
794+
SELECT config #>> '{scheduling,job_id}' as job_id
795+
FROM ai.vectorizer
796+
WHERE id = vectorizer_id
797+
) c
798+
WHERE job_id IS NOT NULL;
799+
800+
-- schedule the async ext job
801+
select ai._vectorizer_schedule_job
802+
( vectorizer_id
803+
, scheduling
804+
) into _job_id
805+
;
806+
if _job_id is not null then
807+
scheduling = pg_catalog.jsonb_insert(scheduling, array['job_id'], pg_catalog.to_jsonb(_job_id));
808+
end if;
809+
810+
UPDATE ai.vectorizer
811+
SET config = config operator(pg_catalog.||) pg_catalog.jsonb_build_object
812+
( 'scheduling'
813+
, scheduling
814+
, 'indexing'
815+
, indexing
816+
)
817+
WHERE id = vectorizer_id
818+
RETURNING config INTO _updated_config;
819+
820+
RETURN _updated_config;
821+
end
822+
$func$ language plpgsql volatile security invoker
823+
set search_path to pg_catalog, pg_temp
824+
;

projects/pgai/db/tests/vectorizer/test_vectorizer.py

Lines changed: 165 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1115,7 +1115,7 @@ def index_creation_tester(cur: psycopg.Cursor, vectorizer_id: int) -> None:
11151115

11161116
# insert 5 rows into the target
11171117
cur.execute(f"""
1118-
insert into {vectorizer.config['destination']['target_schema']}.{vectorizer.config['destination']['target_table']}
1118+
insert into {vectorizer.config["destination"]["target_schema"]}.{vectorizer.config["destination"]["target_table"]}
11191119
( embedding_uuid
11201120
, id
11211121
, chunk_seq
@@ -1151,7 +1151,7 @@ def index_creation_tester(cur: psycopg.Cursor, vectorizer_id: int) -> None:
11511151

11521152
# insert 5 rows into the target
11531153
cur.execute(f"""
1154-
insert into {vectorizer.config['destination']['target_schema']}.{vectorizer.config['destination']['target_table']}
1154+
insert into {vectorizer.config["destination"]["target_schema"]}.{vectorizer.config["destination"]["target_table"]}
11551155
( embedding_uuid
11561156
, id
11571157
, chunk_seq
@@ -1416,7 +1416,7 @@ def test_index_create_concurrency():
14161416

14171417
# insert 10 rows into the target
14181418
cur.execute(f"""
1419-
insert into {vectorizer.config['destination']['target_schema']}.{vectorizer.config['destination']['target_table']}
1419+
insert into {vectorizer.config["destination"]["target_schema"]}.{vectorizer.config["destination"]["target_table"]}
14201420
( embedding_uuid
14211421
, id
14221422
, chunk_seq
@@ -1808,7 +1808,7 @@ def test_grant_to_public():
18081808
cur.execute(f"""
18091809
select has_table_privilege
18101810
( 'public'
1811-
, '{vectorizer.config['destination']['target_schema']}.{vectorizer.config['destination']['target_table']}'
1811+
, '{vectorizer.config["destination"]["target_schema"]}.{vectorizer.config["destination"]["target_table"]}'
18121812
, 'select'
18131813
)""")
18141814
assert cur.fetchone()[0]
@@ -2222,3 +2222,164 @@ def test_install_library_before_ai_extension():
22222222
with psycopg.connect(db_url("test")) as con:
22232223
with con.cursor() as cur:
22242224
cur.execute("create extension ai cascade")
2225+
2226+
2227+
@pytest.mark.skipif(
2228+
os.getenv("PG_MAJOR") == "15", reason="extension does not support pg15"
2229+
)
2230+
def test_set_scheduling():
2231+
with psycopg.connect(db_url("test")) as con:
2232+
with con.cursor() as cur:
2233+
cur.execute("create extension ai cascade")
2234+
2235+
with psycopg.connect(
2236+
db_url("postgres"), autocommit=True, row_factory=namedtuple_row
2237+
) as con:
2238+
with con.cursor() as cur:
2239+
cur.execute("create extension if not exists timescaledb")
2240+
cur.execute("select to_regrole('bob') is null")
2241+
if cur.fetchone()[0] is True:
2242+
cur.execute("create user bob")
2243+
cur.execute("select to_regrole('adelaide') is null")
2244+
if cur.fetchone()[0] is True:
2245+
cur.execute("create user adelaide")
2246+
with psycopg.connect(
2247+
db_url("test"), autocommit=True, row_factory=namedtuple_row
2248+
) as con:
2249+
con.add_notice_handler(detailed_notice_handler)
2250+
with con.cursor() as cur:
2251+
cur.execute("drop schema if exists website cascade")
2252+
cur.execute("create schema website")
2253+
cur.execute("drop table if exists website.blog")
2254+
cur.execute("""
2255+
create table website.blog
2256+
( id int not null generated always as identity
2257+
, title text not null
2258+
, published timestamptz
2259+
, body text not null
2260+
, drop_me text
2261+
, primary key (title, published)
2262+
)
2263+
""")
2264+
cur.execute(
2265+
"""grant select, insert, update, delete on website.blog to bob, adelaide"""
2266+
)
2267+
cur.execute("""grant usage on schema website to adelaide""")
2268+
cur.execute("""
2269+
insert into website.blog(title, published, body)
2270+
values
2271+
('how to cook a hot dog', '2024-01-06'::timestamptz, 'put it on a hot grill')
2272+
, ('how to make a sandwich', '2023-01-06'::timestamptz, 'put a slice of meat between two pieces of bread')
2273+
, ('how to make stir fry', '2022-01-06'::timestamptz, 'pick up the phone and order takeout')
2274+
""")
2275+
2276+
# drop the drop_me column
2277+
cur.execute("alter table website.blog drop column drop_me")
2278+
2279+
# create a vectorizer for the blog table
2280+
# language=PostgreSQL
2281+
cur.execute("""
2282+
select ai.create_vectorizer
2283+
( 'website.blog'::regclass
2284+
, loading => ai.loading_column('body')
2285+
, embedding=>ai.embedding_openai('text-embedding-3-small', 768)
2286+
, chunking=>ai.chunking_character_text_splitter(128, 10)
2287+
, formatting=>ai.formatting_python_template('title: $title published: $published $chunk')
2288+
, scheduling=>ai.scheduling_timescaledb
2289+
( interval '5m'
2290+
, initial_start=>'2050-01-06'::timestamptz
2291+
, timezone=>'America/Chicago'
2292+
)
2293+
, grant_to=>ai.grant_to('bob', 'fernando') -- bob is good. fernando doesn't exist. don't grant to adelaide
2294+
);
2295+
""")
2296+
vectorizer_id = cur.fetchone()[0]
2297+
2298+
# check the vectorizer that was created
2299+
cur.execute(
2300+
"""
2301+
select jsonb_pretty(to_jsonb(x) #- array['config', 'version'])
2302+
from ai.vectorizer x
2303+
where x.id = %s
2304+
""",
2305+
(vectorizer_id,),
2306+
)
2307+
actual = json.dumps(json.loads(cur.fetchone()[0]), sort_keys=True, indent=2)
2308+
expected = json.dumps(json.loads(VECTORIZER_ROW), sort_keys=True, indent=2)
2309+
assert actual == expected
2310+
2311+
# get timescaledb job's job_id
2312+
cur.execute(
2313+
"""
2314+
select (x.config->'scheduling'->>'job_id')::int
2315+
from ai.vectorizer x
2316+
where x.id = %s
2317+
""",
2318+
(vectorizer_id,),
2319+
)
2320+
current_job_id = cur.fetchone()[0]
2321+
2322+
# check the timescaledb job that was created
2323+
cur.execute(
2324+
"""
2325+
select j.schedule_interval = interval '5m'
2326+
and j.proc_schema = 'ai'
2327+
and j.proc_name = '_vectorizer_job'
2328+
and j.scheduled = true
2329+
and j.fixed_schedule = true
2330+
as is_ok
2331+
from timescaledb_information.jobs j
2332+
where j.job_id = %s
2333+
""",
2334+
(current_job_id,),
2335+
)
2336+
actual = cur.fetchone()[0]
2337+
assert actual is True
2338+
2339+
cur.execute(
2340+
"""
2341+
select ai.set_scheduling
2342+
( %s
2343+
, scheduling=>ai.scheduling_timescaledb
2344+
( interval '30m'
2345+
, initial_start=>'2050-01-06'::timestamptz
2346+
, timezone=>'America/Chicago'
2347+
)
2348+
, indexing=>ai.indexing_hnsw()
2349+
)
2350+
""",
2351+
(vectorizer_id,),
2352+
)
2353+
2354+
# check the timescaledb old job that was deleted
2355+
cur.execute(
2356+
"select exists (select from timescaledb_information.jobs j where j.job_id = %s)",
2357+
(current_job_id,),
2358+
)
2359+
exists = cur.fetchone()[0]
2360+
assert not exists
2361+
2362+
cur.execute(
2363+
"select config from ai.vectorizer where id = %s", (vectorizer_id,)
2364+
)
2365+
config = cur.fetchone()[0]
2366+
assert config["scheduling"]["schedule_interval"] == "00:30:00"
2367+
assert config["indexing"]["implementation"] == "hnsw"
2368+
job_id = config["scheduling"]["job_id"]
2369+
assert job_id != current_job_id
2370+
2371+
cur.execute(
2372+
"""
2373+
select j.schedule_interval = interval '30m'
2374+
and j.proc_schema = 'ai'
2375+
and j.proc_name = '_vectorizer_job'
2376+
and j.scheduled = true
2377+
and j.fixed_schedule = true
2378+
as is_ok
2379+
from timescaledb_information.jobs j
2380+
where j.job_id = %s
2381+
""",
2382+
(job_id,),
2383+
)
2384+
actual = cur.fetchone()[0]
2385+
assert actual is True

projects/pgai/pgai/data/ai.sql

Lines changed: 73 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3603,7 +3603,7 @@ begin
36033603
if embedding is null then
36043604
raise exception 'embedding configuration is required';
36053605
end if;
3606-
3606+
36073607
if loading is null then
36083608
raise exception 'loading configuration is required';
36093609
end if;
@@ -3725,7 +3725,7 @@ begin
37253725
raise notice 'a vectorizer named % already exists, skipping', name;
37263726
return _existing_vectorizer_id;
37273727
end if;
3728-
3728+
37293729
-- validate the destination can create objects after the if_not_exists check
37303730
perform ai._validate_destination_can_create_objects(destination);
37313731

@@ -4305,6 +4305,77 @@ as $func$
43054305
$func$ language sql stable security invoker
43064306
set search_path to pg_catalog, pg_temp;
43074307

4308+
4309+
-------------------------------------------------------------------------------
4310+
-- set_scheduling_timescaledb
4311+
create or replace function ai.set_scheduling
4312+
( vectorizer_id pg_catalog.int4
4313+
, scheduling pg_catalog.jsonb default ai.scheduling_default()
4314+
, indexing pg_catalog.jsonb default ai.indexing_default()
4315+
) returns pg_catalog.jsonb
4316+
as $func$
4317+
declare
4318+
_job_id pg_catalog.int8;
4319+
_updated_config pg_catalog.jsonb;
4320+
begin
4321+
-- if ai.indexing_default, resolve the default
4322+
if indexing operator(pg_catalog.->>) 'implementation' = 'default' then
4323+
indexing = ai._resolve_indexing_default();
4324+
end if;
4325+
4326+
-- validate the indexing config
4327+
perform ai._validate_indexing(indexing);
4328+
4329+
-- if ai.scheduling_default, resolve the default
4330+
if scheduling operator(pg_catalog.->>) 'implementation' = 'default' then
4331+
scheduling = ai._resolve_scheduling_default();
4332+
end if;
4333+
4334+
-- validate the scheduling config
4335+
perform ai._validate_scheduling(scheduling);
4336+
4337+
-- if scheduling is none then indexing must also be none
4338+
if scheduling operator(pg_catalog.->>) 'implementation' = 'none'
4339+
and indexing operator(pg_catalog.->>) 'implementation' != 'none' then
4340+
raise exception 'automatic indexing is not supported without scheduling. set indexing=>ai.indexing_none() when scheduling=>ai.scheduling_none()';
4341+
end if;
4342+
4343+
-- delete current job if it exists
4344+
PERFORM public.delete_job(job_id::pg_catalog.int4)
4345+
FROM (
4346+
SELECT config #>> '{scheduling,job_id}' as job_id
4347+
FROM ai.vectorizer
4348+
WHERE id = vectorizer_id
4349+
) c
4350+
WHERE job_id IS NOT NULL;
4351+
4352+
-- schedule the async ext job
4353+
select ai._vectorizer_schedule_job
4354+
( vectorizer_id
4355+
, scheduling
4356+
) into _job_id
4357+
;
4358+
if _job_id is not null then
4359+
scheduling = pg_catalog.jsonb_insert(scheduling, array['job_id'], pg_catalog.to_jsonb(_job_id));
4360+
end if;
4361+
4362+
UPDATE ai.vectorizer
4363+
SET config = config operator(pg_catalog.||) pg_catalog.jsonb_build_object
4364+
( 'scheduling'
4365+
, scheduling
4366+
, 'indexing'
4367+
, indexing
4368+
)
4369+
WHERE id = vectorizer_id
4370+
RETURNING config INTO _updated_config;
4371+
4372+
RETURN _updated_config;
4373+
end
4374+
$func$ language plpgsql volatile security invoker
4375+
set search_path to pg_catalog, pg_temp
4376+
;
4377+
4378+
43084379
--------------------------------------------------------------------------------
43094380
-- 013-worker-tracking.sql
43104381
CREATE OR REPLACE FUNCTION ai._worker_start(version text, expected_heartbeat_interval interval) RETURNS uuid AS $$

0 commit comments

Comments
 (0)