Skip to content

Commit 87d1023

Browse files
committed
Setup CI for adapter with service dependencies
Signed-off-by: Tim Paine <[email protected]>
1 parent 90739ab commit 87d1023

File tree

7 files changed

+125
-104
lines changed

7 files changed

+125
-104
lines changed

.github/workflows/build.yml

Lines changed: 73 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -554,7 +554,6 @@ jobs:
554554
env:
555555
CSP_TEST_SKIP_EXAMPLES: "1"
556556

557-
558557
#################################
559558
#~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~#
560559
#~~~~~~~~~|##########|~~~~~~~~~~#
@@ -620,6 +619,79 @@ jobs:
620619
run: make test TEST_ARGS="-k TestDBReader"
621620
if: ${{ contains( 'sqlalchemy', matrix.package )}}
622621

622+
###########################
623+
#~~~~~~~~~~~~~~~~~~~~~~~~~#
624+
#~~~~~~|#############|~~~~#
625+
#~~~~~~|#|~~~~~~~/##/~~~~~#
626+
#~~~~~~|#|~~~~~/##/~~~~~~~#
627+
#~~~~~~~~~~~~/##/~~~~~~~~~#
628+
#~~~~~~~~~~/##/~~~~~~~~~~~#
629+
#~~~~~~~~/##/~~~~~~~~~~~~~#
630+
#~~~~~~/##/~~~~~~~~~~~~~~~#
631+
#~~~~~~~~~~~~~~~~~~~~~~~~~#
632+
# Test Service Adapters #
633+
#~~~~~~~~~~~~~~~~~~~~~~~~~#
634+
test_adapters:
635+
needs:
636+
- initialize
637+
- build
638+
639+
strategy:
640+
matrix:
641+
os:
642+
- ubuntu-20.04
643+
python-version:
644+
- 3.9
645+
adapter:
646+
- kafka
647+
648+
runs-on: ${{ matrix.os }}
649+
650+
steps:
651+
- name: Checkout
652+
uses: actions/checkout@v4
653+
with:
654+
submodules: recursive
655+
656+
- name: Set up Python ${{ matrix.python-version }}
657+
uses: ./.github/actions/setup-python
658+
with:
659+
version: '${{ matrix.python-version }}'
660+
661+
- name: Install python dependencies
662+
run: make requirements
663+
664+
- name: Install test dependencies
665+
shell: bash
666+
run: sudo apt-get install graphviz
667+
668+
# Download artifact
669+
- name: Download wheel
670+
uses: actions/download-artifact@v4
671+
with:
672+
name: csp-dist-${{ runner.os }}-${{ runner.arch }}-${{ matrix.python-version }}
673+
674+
- name: Install wheel
675+
run: python -m pip install -U *manylinux2014*.whl --target .
676+
677+
- name: Spin up adapter service
678+
run: make dockerup ADAPTER=${{ matrix.adapter }} DOCKERARGS="--wait --wait-timeout 30"
679+
680+
- name: Wait a few seconds after images have been spun up
681+
run: sleep 30
682+
683+
# Run tests
684+
- name: Setup test flags
685+
shell: bash
686+
run: echo "CSP_TEST_$( echo ${{ matrix.adapter }} | awk '{print toupper($0)}' )=1" >> $GITHUB_ENV
687+
688+
- name: Python Test Steps
689+
run: make test TEST_ARGS="-k ${{ matrix.adapter }}"
690+
691+
- name: Spin down adapter service
692+
run: make dockerdown ADAPTER=${{ matrix.adapter }}
693+
if: ${{ always() }}
694+
623695
#############################
624696
#~~~~~~~~~~~~~~~~~~~~~~~~~~~#
625697
#~~~~~~|#############|~~~~~~#

Makefile

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -86,15 +86,16 @@ tests: test
8686

8787
.PHONY: dockerup dockerps dockerdown initpodmanmac
8888
ADAPTER := kafka
89-
DOCKER := podman
89+
DOCKER := docker
90+
DOCKERARGS :=
9091

9192
initpodmanmac:
9293
podman machine stop
9394
podman machine set --cpus 4 --memory 8096
9495
podman machine start
9596

9697
dockerup: ## spin up docker compose services for adapter testing
97-
$(DOCKER) compose -f ci/$(ADAPTER)/docker-compose.yml up -d
98+
$(DOCKER) compose -f ci/$(ADAPTER)/docker-compose.yml up -d $(DOCKERARGS)
9899

99100
dockerps: ## spin up docker compose services for adapter testing
100101
$(DOCKER) compose -f ci/$(ADAPTER)/docker-compose.yml ps

ci/kafka/docker-compose.yml

Lines changed: 25 additions & 79 deletions
Original file line numberDiff line numberDiff line change
@@ -87,84 +87,6 @@ services:
8787
CONNECT_PLUGIN_PATH: "/usr/share/java,/usr/share/confluent-hub-components"
8888
CONNECT_LOG4J_LOGGERS: org.apache.zookeeper=ERROR,org.I0Itec.zkclient=ERROR,org.reflections=ERROR
8989

90-
control-center:
91-
image: confluentinc/cp-enterprise-control-center:7.5.3
92-
hostname: control-center
93-
container_name: control-center
94-
depends_on:
95-
- broker
96-
- schema-registry
97-
- connect
98-
- ksqldb-server
99-
ports:
100-
- "9021:9021"
101-
environment:
102-
CONTROL_CENTER_BOOTSTRAP_SERVERS: 'broker:29092'
103-
CONTROL_CENTER_CONNECT_CONNECT-DEFAULT_CLUSTER: 'connect:8083'
104-
CONTROL_CENTER_KSQL_KSQLDB1_URL: "http://ksqldb-server:8088"
105-
CONTROL_CENTER_KSQL_KSQLDB1_ADVERTISED_URL: "http://localhost:8088"
106-
CONTROL_CENTER_SCHEMA_REGISTRY_URL: "http://schema-registry:8081"
107-
CONTROL_CENTER_REPLICATION_FACTOR: 1
108-
CONTROL_CENTER_INTERNAL_TOPICS_PARTITIONS: 1
109-
CONTROL_CENTER_MONITORING_INTERCEPTOR_TOPIC_PARTITIONS: 1
110-
CONFLUENT_METRICS_TOPIC_REPLICATION: 1
111-
PORT: 9021
112-
113-
ksqldb-server:
114-
image: confluentinc/cp-ksqldb-server:7.5.3
115-
hostname: ksqldb-server
116-
container_name: ksqldb-server
117-
depends_on:
118-
- broker
119-
- connect
120-
ports:
121-
- "8088:8088"
122-
environment:
123-
KSQL_CONFIG_DIR: "/etc/ksql"
124-
KSQL_BOOTSTRAP_SERVERS: "broker:29092"
125-
KSQL_HOST_NAME: ksqldb-server
126-
KSQL_LISTENERS: "http://0.0.0.0:8088"
127-
KSQL_CACHE_MAX_BYTES_BUFFERING: 0
128-
KSQL_KSQL_SCHEMA_REGISTRY_URL: "http://schema-registry:8081"
129-
KSQL_PRODUCER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor"
130-
KSQL_CONSUMER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor"
131-
KSQL_KSQL_CONNECT_URL: "http://connect:8083"
132-
KSQL_KSQL_LOGGING_PROCESSING_TOPIC_REPLICATION_FACTOR: 1
133-
KSQL_KSQL_LOGGING_PROCESSING_TOPIC_AUTO_CREATE: 'true'
134-
KSQL_KSQL_LOGGING_PROCESSING_STREAM_AUTO_CREATE: 'true'
135-
136-
# ksqldb-cli:
137-
# image: confluentinc/cp-ksqldb-cli:7.5.3
138-
# container_name: ksqldb-cli
139-
# depends_on:
140-
# - broker
141-
# - connect
142-
# - ksqldb-server
143-
# entrypoint: /bin/sh
144-
# tty: true
145-
146-
# ksql-datagen:
147-
# image: confluentinc/ksqldb-examples:7.5.3
148-
# hostname: ksql-datagen
149-
# container_name: ksql-datagen
150-
# depends_on:
151-
# - ksqldb-server
152-
# - broker
153-
# - schema-registry
154-
# - connect
155-
# command: "bash -c 'echo Waiting for Kafka to be ready... && \
156-
# cub kafka-ready -b broker:29092 1 40 && \
157-
# echo Waiting for Confluent Schema Registry to be ready... && \
158-
# cub sr-ready schema-registry 8081 40 && \
159-
# echo Waiting a few seconds for topic creation to finish... && \
160-
# sleep 11 && \
161-
# tail -f /dev/null'"
162-
# environment:
163-
# KSQL_CONFIG_DIR: "/etc/ksql"
164-
# STREAMS_BOOTSTRAP_SERVERS: broker:29092
165-
# STREAMS_SCHEMA_REGISTRY_HOST: schema-registry
166-
# STREAMS_SCHEMA_REGISTRY_PORT: 8081
167-
16890
rest-proxy:
16991
image: confluentinc/cp-kafka-rest:7.5.3
17092
depends_on:
@@ -178,4 +100,28 @@ services:
178100
KAFKA_REST_HOST_NAME: rest-proxy
179101
KAFKA_REST_BOOTSTRAP_SERVERS: 'broker:29092'
180102
KAFKA_REST_LISTENERS: "http://0.0.0.0:8082"
181-
KAFKA_REST_SCHEMA_REGISTRY_URL: 'http://schema-registry:8081'
103+
KAFKA_REST_SCHEMA_REGISTRY_URL: 'http://schema-registry:8081'
104+
105+
# Uncomment for a helpful UI
106+
# control-center:
107+
# image: confluentinc/cp-enterprise-control-center:7.5.3
108+
# hostname: control-center
109+
# container_name: control-center
110+
# depends_on:
111+
# - broker
112+
# - schema-registry
113+
# - connect
114+
# ports:
115+
# - "9021:9021"
116+
# environment:
117+
# CONTROL_CENTER_BOOTSTRAP_SERVERS: 'broker:29092'
118+
# CONTROL_CENTER_CONNECT_CONNECT-DEFAULT_CLUSTER: 'connect:8083'
119+
# CONTROL_CENTER_KSQL_KSQLDB1_URL: "http://ksqldb-server:8088"
120+
# CONTROL_CENTER_KSQL_KSQLDB1_ADVERTISED_URL: "http://localhost:8088"
121+
# CONTROL_CENTER_SCHEMA_REGISTRY_URL: "http://schema-registry:8081"
122+
# CONTROL_CENTER_REPLICATION_FACTOR: 1
123+
# CONTROL_CENTER_INTERNAL_TOPICS_PARTITIONS: 1
124+
# CONTROL_CENTER_MONITORING_INTERCEPTOR_TOPIC_PARTITIONS: 1
125+
# CONFLUENT_METRICS_TOPIC_REPLICATION: 1
126+
# PORT: 9021
127+

csp/adapters/kafka.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,7 @@ def __init__(
7373

7474
consumer_properties = {
7575
"group.id": group_id,
76-
# To get end of parition notification for live / not live flag
76+
# To get end of partition notification for live / not live flag
7777
"enable.partition.eof": "true",
7878
}
7979

csp/tests/adapters/conftest.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,6 @@ def kafkabroker():
1212
def kafkaadapter(kafkabroker):
1313
group_id = "group.id123"
1414
_kafkaadapter = KafkaAdapterManager(
15-
broker=kafkabroker, group_id=group_id, rd_kafka_conf_options={"allow.auto.create.topics": "true"}
15+
broker=kafkabroker, group_id=group_id
1616
)
1717
return _kafkaadapter

csp/tests/adapters/test_kafka.py

Lines changed: 21 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -79,11 +79,10 @@ def graph(count: int):
7979
}
8080

8181
topic = f"test.metadata.{os.getpid()}"
82-
_precreate_topic(topic)
8382
subKey = "foo"
8483
pubKey = ["mapped_a", "mapped_b", "mapped_c"]
8584

86-
c = csp.count(csp.timer(timedelta(seconds=0.1)))
85+
c = csp.count(csp.timer(timedelta(seconds=0.5)))
8786
t = csp.sample(c, csp.const("foo"))
8887

8988
pubStruct = MetaPubData.collectts(
@@ -104,22 +103,23 @@ def graph(count: int):
104103
)
105104

106105
csp.add_graph_output("sub_data", sub_data)
107-
# csp.print('sub', sub_data)
106+
csp.print('sub', sub_data)
108107
# Wait for at least count ticks and until we get a live tick
109-
done_flag = csp.count(sub_data) >= count
110-
done_flag = csp.and_(done_flag, sub_data.mapped_live is True)
108+
done_flag = csp.and_(csp.count(sub_data) >= count, sub_data.mapped_live == True) # noqa: E712
111109
stop = csp.filter(done_flag, done_flag)
112110
csp.stop_engine(stop)
113111

114-
count = 5
115-
results = csp.run(graph, count, starttime=datetime.utcnow(), endtime=timedelta(seconds=30), realtime=True)
112+
results = csp.run(graph, 5, starttime=datetime.utcnow(), endtime=timedelta(seconds=20), realtime=True)
116113
assert len(results["sub_data"]) >= 5
117114
print(results)
118115
for result in results["sub_data"]:
119116
assert result[1].mapped_partition >= 0
120117
assert result[1].mapped_offset >= 0
121118
assert result[1].mapped_live is not None
122119
assert result[1].mapped_timestamp < datetime.utcnow()
120+
# first record should be non live
121+
assert results["sub_data"][0][1].mapped_live is False
122+
# last record should be live
123123
assert results["sub_data"][-1][1].mapped_live
124124

125125
@pytest.mark.skipif(not os.environ.get("CSP_TEST_KAFKA"), reason="Skipping kafka adapter tests")
@@ -145,8 +145,7 @@ def graph(symbols: list, count: int):
145145
struct_field_map = {"b": "b2", "i": "i2", "d": "d2", "s": "s2", "dt": "dt2"}
146146

147147
done_flags = []
148-
topic = f"mktdata.{os.getpid()}"
149-
_precreate_topic(topic)
148+
150149
for symbol in symbols:
151150
kafkaadapter.publish(msg_mapper, topic, symbol, b, field_map="b")
152151
kafkaadapter.publish(msg_mapper, topic, symbol, i, field_map="i")
@@ -183,10 +182,12 @@ def graph(symbols: list, count: int):
183182
stop = csp.filter(stop, stop)
184183
csp.stop_engine(stop)
185184

185+
topic = f"mktdata.{os.getpid()}"
186+
_precreate_topic(topic)
186187
symbols = ["AAPL", "MSFT"]
187188
count = 100
188189
results = csp.run(
189-
graph, symbols, count, starttime=datetime.utcnow(), endtime=timedelta(seconds=30), realtime=True
190+
graph, symbols, count, starttime=datetime.utcnow(), endtime=timedelta(seconds=10), realtime=True
190191
)
191192
for symbol in symbols:
192193
pub = results[f"pall_{symbol}"]
@@ -212,7 +213,7 @@ def pub_graph():
212213
csp.stop_engine(stop)
213214
# csp.print('pub', struct)
214215

215-
csp.run(pub_graph, starttime=datetime.utcnow(), endtime=timedelta(seconds=30), realtime=True)
216+
csp.run(pub_graph, starttime=datetime.utcnow(), endtime=timedelta(seconds=10), realtime=True)
216217

217218
# grab start/end times
218219
def get_times_graph():
@@ -232,7 +233,7 @@ def get_times_graph():
232233
# csp.print('sub', data)
233234
# csp.print('status', kafkaadapter.status())
234235

235-
all_data = csp.run(get_times_graph, starttime=datetime.utcnow(), endtime=timedelta(seconds=30), realtime=True)[
236+
all_data = csp.run(get_times_graph, starttime=datetime.utcnow(), endtime=timedelta(seconds=10), realtime=True)[
236237
"data"
237238
]
238239
min_time = all_data[0][1].dt
@@ -258,7 +259,7 @@ def get_data(start_offset, expected_count):
258259
KafkaStartOffset.EARLIEST,
259260
10,
260261
starttime=datetime.utcnow(),
261-
endtime=timedelta(seconds=30),
262+
endtime=timedelta(seconds=10),
262263
realtime=True,
263264
)["data"]
264265
# print(res)
@@ -276,7 +277,7 @@ def get_data(start_offset, expected_count):
276277
assert len(res) == 0
277278

278279
res = csp.run(
279-
get_data, KafkaStartOffset.START_TIME, 10, starttime=min_time, endtime=timedelta(seconds=30), realtime=True
280+
get_data, KafkaStartOffset.START_TIME, 10, starttime=min_time, endtime=timedelta(seconds=10), realtime=True
280281
)["data"]
281282
assert len(res) == 10
282283

@@ -287,12 +288,12 @@ def get_data(start_offset, expected_count):
287288
stime = all_data[2][1].dt + timedelta(milliseconds=1)
288289
expected = [x for x in all_data if x[1].dt >= stime]
289290
res = csp.run(
290-
get_data, stime, len(expected), starttime=datetime.utcnow(), endtime=timedelta(seconds=30), realtime=True
291+
get_data, stime, len(expected), starttime=datetime.utcnow(), endtime=timedelta(seconds=10), realtime=True
291292
)["data"]
292293
assert len(res) == len(expected)
293294

294295
res = csp.run(
295-
get_data, timedelta(seconds=0), len(expected), starttime=stime, endtime=timedelta(seconds=30), realtime=True
296+
get_data, timedelta(seconds=0), len(expected), starttime=stime, endtime=timedelta(seconds=10), realtime=True
296297
)["data"]
297298
assert len(res) == len(expected)
298299

@@ -314,8 +315,6 @@ def graph(symbols: list, count: int):
314315
msg_mapper = RawBytesMessageMapper()
315316

316317
done_flags = []
317-
topic = f"test_str.{os.getpid()}"
318-
_precreate_topic(topic)
319318
for symbol in symbols:
320319
topic = f"test_str.{os.getpid()}"
321320
kafkaadapter.publish(msg_mapper, topic, symbol, d)
@@ -356,10 +355,13 @@ def graph(symbols: list, count: int):
356355
stop = csp.filter(stop, stop)
357356
csp.stop_engine(stop)
358357

358+
topic = f"test_str.{os.getpid()}"
359+
_precreate_topic(topic)
360+
359361
symbols = ["AAPL", "MSFT"]
360362
count = 10
361363
results = csp.run(
362-
graph, symbols, count, starttime=datetime.utcnow(), endtime=timedelta(seconds=30), realtime=True
364+
graph, symbols, count, starttime=datetime.utcnow(), endtime=timedelta(seconds=10), realtime=True
363365
)
364366
# print(results)
365367
for symbol in symbols:

csp/tests/adapters/test_status.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ class SubData(csp.Struct):
1414
a: bool
1515

1616

17-
class TestStatus:
17+
class TestStatusKafka:
1818
@pytest.mark.skipif(not os.environ.get("CSP_TEST_KAFKA"), reason="Skipping kafka adapter tests")
1919
def test_basic(self, kafkaadapter):
2020
topic = f"csp.unittest.{os.getpid()}"

0 commit comments

Comments
 (0)