|
74 | 74 | """
|
75 | 75 |
|
76 | 76 | import logging
|
| 77 | +from collections import defaultdict |
77 | 78 | from typing import (
|
78 | 79 | TYPE_CHECKING,
|
79 | 80 | Collection,
|
|
95 | 96 | DatabasePool,
|
96 | 97 | LoggingDatabaseConnection,
|
97 | 98 | LoggingTransaction,
|
| 99 | + PostgresEngine, |
98 | 100 | )
|
99 | 101 | from synapse.storage.databases.main.receipts import ReceiptsWorkerStore
|
100 | 102 | from synapse.storage.databases.main.stream import StreamWorkerStore
|
@@ -463,6 +465,153 @@ def add_thread_id_summary_txn(txn: LoggingTransaction) -> int:
|
463 | 465 |
|
464 | 466 | return result
|
465 | 467 |
|
| 468 | + async def get_unread_counts_by_room_for_user(self, user_id: str) -> Dict[str, int]: |
| 469 | + """Get the notification count by room for a user. Only considers notifications, |
| 470 | + not highlight or unread counts, and threads are currently aggregated under their room. |
| 471 | +
|
| 472 | + This function is intentionally not cached because it is called to calculate the |
| 473 | + unread badge for push notifications and thus the result is expected to change. |
| 474 | +
|
| 475 | + Note that this function assumes the user is a member of the room. Because |
| 476 | + summary rows are not removed when a user leaves a room, the caller must |
| 477 | + filter out those results from the result. |
| 478 | +
|
| 479 | + Returns: |
| 480 | + A map of room ID to notification counts for the given user. |
| 481 | + """ |
| 482 | + return await self.db_pool.runInteraction( |
| 483 | + "get_unread_counts_by_room_for_user", |
| 484 | + self._get_unread_counts_by_room_for_user_txn, |
| 485 | + user_id, |
| 486 | + ) |
| 487 | + |
| 488 | + def _get_unread_counts_by_room_for_user_txn( |
| 489 | + self, txn: LoggingTransaction, user_id: str |
| 490 | + ) -> Dict[str, int]: |
| 491 | + receipt_types_clause, args = make_in_list_sql_clause( |
| 492 | + self.database_engine, |
| 493 | + "receipt_type", |
| 494 | + (ReceiptTypes.READ, ReceiptTypes.READ_PRIVATE), |
| 495 | + ) |
| 496 | + args.extend([user_id, user_id]) |
| 497 | + |
| 498 | + receipts_cte = f""" |
| 499 | + WITH all_receipts AS ( |
| 500 | + SELECT room_id, thread_id, MAX(event_stream_ordering) AS max_receipt_stream_ordering |
| 501 | + FROM receipts_linearized |
| 502 | + LEFT JOIN events USING (room_id, event_id) |
| 503 | + WHERE |
| 504 | + {receipt_types_clause} |
| 505 | + AND user_id = ? |
| 506 | + GROUP BY room_id, thread_id |
| 507 | + ) |
| 508 | + """ |
| 509 | + |
| 510 | + receipts_joins = """ |
| 511 | + LEFT JOIN ( |
| 512 | + SELECT room_id, thread_id, |
| 513 | + max_receipt_stream_ordering AS threaded_receipt_stream_ordering |
| 514 | + FROM all_receipts |
| 515 | + WHERE thread_id IS NOT NULL |
| 516 | + ) AS threaded_receipts USING (room_id, thread_id) |
| 517 | + LEFT JOIN ( |
| 518 | + SELECT room_id, thread_id, |
| 519 | + max_receipt_stream_ordering AS unthreaded_receipt_stream_ordering |
| 520 | + FROM all_receipts |
| 521 | + WHERE thread_id IS NULL |
| 522 | + ) AS unthreaded_receipts USING (room_id) |
| 523 | + """ |
| 524 | + |
| 525 | + # First get summary counts by room / thread for the user. We use the max receipt |
| 526 | + # stream ordering of both threaded & unthreaded receipts to compare against the |
| 527 | + # summary table. |
| 528 | + # |
| 529 | + # PostgreSQL and SQLite differ in comparing scalar numerics. |
| 530 | + if isinstance(self.database_engine, PostgresEngine): |
| 531 | + # GREATEST ignores NULLs. |
| 532 | + max_clause = """GREATEST( |
| 533 | + threaded_receipt_stream_ordering, |
| 534 | + unthreaded_receipt_stream_ordering |
| 535 | + )""" |
| 536 | + else: |
| 537 | + # MAX returns NULL if any are NULL, so COALESCE to 0 first. |
| 538 | + max_clause = """MAX( |
| 539 | + COALESCE(threaded_receipt_stream_ordering, 0), |
| 540 | + COALESCE(unthreaded_receipt_stream_ordering, 0) |
| 541 | + )""" |
| 542 | + |
| 543 | + sql = f""" |
| 544 | + {receipts_cte} |
| 545 | + SELECT eps.room_id, eps.thread_id, notif_count |
| 546 | + FROM event_push_summary AS eps |
| 547 | + {receipts_joins} |
| 548 | + WHERE user_id = ? |
| 549 | + AND notif_count != 0 |
| 550 | + AND ( |
| 551 | + (last_receipt_stream_ordering IS NULL AND stream_ordering > {max_clause}) |
| 552 | + OR last_receipt_stream_ordering = {max_clause} |
| 553 | + ) |
| 554 | + """ |
| 555 | + txn.execute(sql, args) |
| 556 | + |
| 557 | + seen_thread_ids = set() |
| 558 | + room_to_count: Dict[str, int] = defaultdict(int) |
| 559 | + |
| 560 | + for room_id, thread_id, notif_count in txn: |
| 561 | + room_to_count[room_id] += notif_count |
| 562 | + seen_thread_ids.add(thread_id) |
| 563 | + |
| 564 | + # Now get any event push actions that haven't been rotated using the same OR |
| 565 | + # join and filter by receipt and event push summary rotated up to stream ordering. |
| 566 | + sql = f""" |
| 567 | + {receipts_cte} |
| 568 | + SELECT epa.room_id, epa.thread_id, COUNT(CASE WHEN epa.notif = 1 THEN 1 END) AS notif_count |
| 569 | + FROM event_push_actions AS epa |
| 570 | + {receipts_joins} |
| 571 | + WHERE user_id = ? |
| 572 | + AND epa.notif = 1 |
| 573 | + AND stream_ordering > (SELECT stream_ordering FROM event_push_summary_stream_ordering) |
| 574 | + AND (threaded_receipt_stream_ordering IS NULL OR stream_ordering > threaded_receipt_stream_ordering) |
| 575 | + AND (unthreaded_receipt_stream_ordering IS NULL OR stream_ordering > unthreaded_receipt_stream_ordering) |
| 576 | + GROUP BY epa.room_id, epa.thread_id |
| 577 | + """ |
| 578 | + txn.execute(sql, args) |
| 579 | + |
| 580 | + for room_id, thread_id, notif_count in txn: |
| 581 | + # Note: only count push actions we have valid summaries for with up to date receipt. |
| 582 | + if thread_id not in seen_thread_ids: |
| 583 | + continue |
| 584 | + room_to_count[room_id] += notif_count |
| 585 | + |
| 586 | + thread_id_clause, thread_ids_args = make_in_list_sql_clause( |
| 587 | + self.database_engine, "epa.thread_id", seen_thread_ids |
| 588 | + ) |
| 589 | + |
| 590 | + # Finally re-check event_push_actions for any rooms not in the summary, ignoring |
| 591 | + # the rotated up-to position. This handles the case where a read receipt has arrived |
| 592 | + # but not been rotated meaning the summary table is out of date, so we go back to |
| 593 | + # the push actions table. |
| 594 | + sql = f""" |
| 595 | + {receipts_cte} |
| 596 | + SELECT epa.room_id, COUNT(CASE WHEN epa.notif = 1 THEN 1 END) AS notif_count |
| 597 | + FROM event_push_actions AS epa |
| 598 | + {receipts_joins} |
| 599 | + WHERE user_id = ? |
| 600 | + AND NOT {thread_id_clause} |
| 601 | + AND epa.notif = 1 |
| 602 | + AND (threaded_receipt_stream_ordering IS NULL OR stream_ordering > threaded_receipt_stream_ordering) |
| 603 | + AND (unthreaded_receipt_stream_ordering IS NULL OR stream_ordering > unthreaded_receipt_stream_ordering) |
| 604 | + GROUP BY epa.room_id |
| 605 | + """ |
| 606 | + |
| 607 | + args.extend(thread_ids_args) |
| 608 | + txn.execute(sql, args) |
| 609 | + |
| 610 | + for room_id, notif_count in txn: |
| 611 | + room_to_count[room_id] += notif_count |
| 612 | + |
| 613 | + return room_to_count |
| 614 | + |
466 | 615 | @cached(tree=True, max_entries=5000, iterable=True)
|
467 | 616 | async def get_unread_event_push_actions_by_room_for_user(
|
468 | 617 | self,
|
|
0 commit comments