Skip to content

Commit 42e87f0

Browse files
committed
feat_: add fetched activity persistence
1 parent c3ec149 commit 42e87f0

File tree

4 files changed

+346
-0
lines changed

4 files changed

+346
-0
lines changed
Lines changed: 166 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,166 @@
1+
package activityfetcher
2+
3+
import (
4+
"context"
5+
"database/sql"
6+
"errors"
7+
8+
"github.com/google/uuid"
9+
10+
sq "github.com/Masterminds/squirrel"
11+
12+
"github.com/status-im/status-go/services/wallet/thirdparty"
13+
"github.com/status-im/status-go/sqlite"
14+
15+
"github.com/ethereum/go-ethereum/common"
16+
)
17+
18+
type PersistenceInterface interface {
19+
SaveActivity(ctx context.Context, chainID uint64, parameters thirdparty.ActivityFetchParameters, activity thirdparty.ActivityEntryContainer) error
20+
}
21+
22+
type Persistence struct {
23+
db *sql.DB
24+
}
25+
26+
func NewPersistence(db *sql.DB) *Persistence {
27+
return &Persistence{db: db}
28+
}
29+
30+
func (p *Persistence) SaveActivity(ctx context.Context, chainID uint64, parameters thirdparty.ActivityFetchParameters, activity thirdparty.ActivityEntryContainer) (err error) {
31+
var tx *sql.Tx
32+
tx, err = p.db.Begin()
33+
if err != nil {
34+
return err
35+
}
36+
defer func() {
37+
if err == nil {
38+
err = tx.Commit()
39+
return
40+
}
41+
_ = tx.Rollback()
42+
}()
43+
44+
return saveActivity(tx, chainID, parameters, activity)
45+
}
46+
47+
func saveActivity(creator sqlite.StatementCreator, chainID uint64, parameters thirdparty.ActivityFetchParameters, activity thirdparty.ActivityEntryContainer) error {
48+
id := uuid.New().String()
49+
50+
err := saveFetchParameters(creator, id, chainID, parameters, activity.NextCursor, activity.PreviousCursor, activity.Provider)
51+
if err != nil {
52+
return err
53+
}
54+
55+
err = saveActivityEntries(creator, id, activity.Items)
56+
if err != nil {
57+
return err
58+
}
59+
60+
return nil
61+
}
62+
63+
func saveFetchParameters(creator sqlite.StatementCreator, id string, chainID uint64, parameters thirdparty.ActivityFetchParameters, nextCursor, previousCursor, provider string) error {
64+
q := sq.Insert("fetched_activity_fetch_parameters").
65+
Columns("id", "chain_id", "parameters", "next_cursor", "previous_cursor", "provider").
66+
Values(id, chainID, sqlite.ToJSONBlob(parameters), nextCursor, previousCursor, provider)
67+
68+
query, args, err := q.ToSql()
69+
if err != nil {
70+
return err
71+
}
72+
73+
stmt, err := creator.Prepare(query)
74+
if err != nil {
75+
return err
76+
}
77+
78+
_, err = stmt.Exec(args...)
79+
return err
80+
}
81+
82+
func saveActivityEntries(creator sqlite.StatementCreator, id string, entries []thirdparty.ActivityEntry) error {
83+
for _, entry := range entries {
84+
err := saveActivityEntry(creator, id, entry)
85+
if err != nil {
86+
return err
87+
}
88+
}
89+
return nil
90+
}
91+
92+
func saveActivityEntry(creator sqlite.StatementCreator, id string, entry thirdparty.ActivityEntry) error {
93+
q := sq.Insert("fetched_activity_entries").
94+
Columns("fetch_parameters_id", "entry").
95+
Values(id, sqlite.ToJSONBlob(entry))
96+
97+
query, args, err := q.ToSql()
98+
if err != nil {
99+
return err
100+
}
101+
102+
stmt, err := creator.Prepare(query)
103+
if err != nil {
104+
return err
105+
}
106+
107+
_, err = stmt.Exec(args...)
108+
return err
109+
}
110+
111+
func (p *Persistence) GetActivity(ctx context.Context, chainIDs []uint64, addresses []common.Address, limit uint64) ([]thirdparty.ActivityEntry, error) {
112+
q := sq.Select("e.entry").
113+
From("fetched_activity_entries e").
114+
LeftJoin(`fetched_activity_fetch_parameters fp ON
115+
e.fetch_parameters_id = fp.id`).
116+
Where(sq.And{
117+
sq.Or{
118+
sq.Eq{"e.chain_id_out": chainIDs},
119+
sq.Eq{"e.chain_id_in": chainIDs},
120+
},
121+
sq.Or{
122+
sq.Eq{"e.sender": addresses},
123+
sq.Eq{"e.recipient": addresses},
124+
},
125+
}).GroupBy("e.entry")
126+
127+
if limit > 0 {
128+
q = q.Limit(limit)
129+
}
130+
131+
query, args, err := q.ToSql()
132+
if err != nil {
133+
return nil, err
134+
}
135+
136+
stmt, err := p.db.Prepare(query)
137+
if err != nil {
138+
return nil, err
139+
}
140+
defer stmt.Close()
141+
142+
rows, err := stmt.Query(args...)
143+
if err != nil {
144+
return nil, err
145+
}
146+
defer rows.Close()
147+
148+
return rowsToActivityEntries(rows)
149+
}
150+
151+
func rowsToActivityEntries(rows *sql.Rows) ([]thirdparty.ActivityEntry, error) {
152+
var entries []thirdparty.ActivityEntry
153+
for rows.Next() {
154+
var entry thirdparty.ActivityEntry
155+
var entryJSON = sqlite.ToJSONBlob(&entry)
156+
err := rows.Scan(entryJSON)
157+
if err != nil {
158+
return nil, err
159+
}
160+
if !entryJSON.Valid {
161+
return nil, errors.New("invalid entry")
162+
}
163+
entries = append(entries, entry)
164+
}
165+
return entries, nil
166+
}
Lines changed: 150 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,150 @@
1+
package activityfetcher_test
2+
3+
import (
4+
"context"
5+
"math/big"
6+
"testing"
7+
8+
"github.com/ethereum/go-ethereum/common"
9+
"github.com/ethereum/go-ethereum/common/hexutil"
10+
"github.com/ethereum/go-ethereum/rpc"
11+
12+
ac "github.com/status-im/status-go/services/wallet/activity/common"
13+
"github.com/status-im/status-go/services/wallet/activityfetcher"
14+
"github.com/status-im/status-go/services/wallet/thirdparty"
15+
"github.com/status-im/status-go/t/helpers"
16+
"github.com/status-im/status-go/walletdatabase"
17+
18+
"github.com/stretchr/testify/require"
19+
)
20+
21+
func ptr[T any](v T) *T {
22+
return &v
23+
}
24+
25+
func TestSaveActivity(t *testing.T) {
26+
walletDB, err := helpers.SetupTestMemorySQLDB(walletdatabase.DbInitializer{})
27+
require.NoError(t, err)
28+
defer walletDB.Close()
29+
30+
persistence := activityfetcher.NewPersistence(walletDB)
31+
32+
sender1 := common.HexToAddress("0x0000000000000000000000000000000000000001")
33+
sender2 := common.HexToAddress("0x0000000000000000000000000000000000000004")
34+
recipient1 := common.HexToAddress("0x0000000000000000000000000000000000000002")
35+
recipient2 := common.HexToAddress("0x0000000000000000000000000000000000000005")
36+
37+
activity := thirdparty.ActivityEntryContainer{
38+
NextCursor: "next",
39+
PreviousCursor: "previous",
40+
Provider: "provider",
41+
Items: []thirdparty.ActivityEntry{
42+
{
43+
Timestamp: 1716153600,
44+
ActivityType: ac.SendAT,
45+
AmountOut: (*hexutil.Big)(big.NewInt(1000000000000000000)), // 1 ETH
46+
TokenOut: &ac.Token{
47+
TokenType: ac.Native,
48+
ChainID: 1,
49+
Address: common.HexToAddress("0x0000000000000000000000000000000000000000"),
50+
},
51+
Sender: sender1,
52+
Recipient: &recipient1,
53+
ChainIDOut: ptr(uint64(1)),
54+
ContractAddress: ptr(common.HexToAddress("0x0000000000000000000000000000000000000003")),
55+
TxHash: common.HexToHash("0x123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef0"),
56+
BlockNumber: (*hexutil.Big)(big.NewInt(17000000)),
57+
},
58+
{
59+
Timestamp: 1716153500,
60+
ActivityType: ac.SwapAT,
61+
AmountOut: (*hexutil.Big)(big.NewInt(1000000000000000000)), // 1 Token A
62+
AmountIn: (*hexutil.Big)(big.NewInt(2000000000000000000)), // 2 Token B
63+
TokenOut: &ac.Token{
64+
TokenType: ac.Erc20,
65+
ChainID: 1,
66+
Address: common.HexToAddress("0x1111111111111111111111111111111111111111"),
67+
},
68+
TokenIn: &ac.Token{
69+
TokenType: ac.Erc20,
70+
ChainID: 1,
71+
Address: common.HexToAddress("0x2222222222222222222222222222222222222222"),
72+
},
73+
Sender: sender2,
74+
Recipient: &recipient2,
75+
ChainIDOut: ptr(uint64(1)),
76+
ChainIDIn: ptr(uint64(1)),
77+
ContractAddress: ptr(common.HexToAddress("0x0000000000000000000000000000000000000006")),
78+
TxHash: common.HexToHash("0xabcdef0123456789abcdef0123456789abcdef0123456789abcdef0123456789"),
79+
BlockNumber: (*hexutil.Big)(big.NewInt(17000001)),
80+
},
81+
},
82+
}
83+
84+
parameters := thirdparty.ActivityFetchParameters{
85+
FromBlock: ptr(rpc.BlockNumber(17000000)),
86+
ToBlock: ptr(rpc.BlockNumber(17000001)),
87+
Address: sender1,
88+
Order: thirdparty.NewToOld,
89+
Direction: thirdparty.Both,
90+
}
91+
92+
// Save the activity
93+
err = persistence.SaveActivity(context.Background(), 1, parameters, activity)
94+
require.NoError(t, err)
95+
96+
// Test retrieving activities for specific addresses and chain IDs
97+
testCases := []struct {
98+
name string
99+
chainIDs []uint64
100+
addresses []common.Address
101+
limit uint64
102+
expected int // expected number of activities
103+
}{
104+
{
105+
name: "fetch by sender1",
106+
chainIDs: []uint64{1},
107+
addresses: []common.Address{sender1},
108+
limit: 10,
109+
expected: 1,
110+
},
111+
{
112+
name: "fetch by recipient2",
113+
chainIDs: []uint64{1},
114+
addresses: []common.Address{recipient2},
115+
limit: 10,
116+
expected: 1,
117+
},
118+
{
119+
name: "fetch by multiple addresses",
120+
chainIDs: []uint64{1},
121+
addresses: []common.Address{sender1, sender2, recipient1, recipient2},
122+
limit: 10,
123+
expected: 2,
124+
},
125+
{
126+
name: "fetch with wrong chain ID",
127+
chainIDs: []uint64{2},
128+
addresses: []common.Address{sender1},
129+
limit: 10,
130+
expected: 0,
131+
},
132+
}
133+
134+
for _, tc := range testCases {
135+
t.Run(tc.name, func(t *testing.T) {
136+
activities, err := persistence.GetActivity(context.Background(), tc.chainIDs, tc.addresses, tc.limit)
137+
require.NoError(t, err)
138+
require.Len(t, activities, tc.expected)
139+
140+
if tc.expected > 0 {
141+
// Verify the content of retrieved activities
142+
for _, act := range activities {
143+
require.NotZero(t, act.Timestamp)
144+
require.NotEmpty(t, act.TxHash)
145+
require.NotNil(t, act.BlockNumber)
146+
}
147+
}
148+
})
149+
}
150+
}

sqlite/fields.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,11 @@ func (blob *JSONBlob) Value() (driver.Value, error) {
6262
return json.Marshal(blob.Data)
6363
}
6464

65+
// ToJSONBlob converts a single value to a JSONBlob
66+
func ToJSONBlob[T any](value T) *JSONBlob {
67+
return &JSONBlob{Data: value}
68+
}
69+
6570
func BigIntToClampedInt64(val *big.Int) *int64 {
6671
if val == nil {
6772
return nil
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
CREATE TABLE IF NOT EXISTS fetched_activity_fetch_parameters (
2+
id TEXT PRIMARY KEY NOT NULL,
3+
chain_id UNSIGNED BIGINT NOT NULL,
4+
parameters JSON NOT NULL,
5+
provider TEXT NOT NULL,
6+
previous_cursor TEXT,
7+
next_cursor TEXT,
8+
created_at TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP
9+
);
10+
11+
CREATE TABLE IF NOT EXISTS fetched_activity_entries (
12+
fetch_parameters_id TEXT NOT NULL,
13+
entry JSON NOT NULL,
14+
timestamp INTEGER NOT NULL AS (json_extract(entry, '$.timestamp')),
15+
chain_id_out UNSIGNED INTEGER AS (json_extract(entry, '$.chainIdOut')),
16+
chain_id_in UNSIGNED INTEGER AS (json_extract(entry, '$.chainIdIn')),
17+
sender BLOB AS (unhex(substr(json_extract(entry, '$.sender'),3))),
18+
recipient BLOB AS (unhex(substr(json_extract(entry, '$.recipient'),3))),
19+
FOREIGN KEY (fetch_parameters_id) REFERENCES fetched_activity_fetch_parameters(id)
20+
);
21+
22+
CREATE INDEX IF NOT EXISTS idx_fetched_activity_entries_per_sender ON fetched_activity_entries (sender);
23+
CREATE INDEX IF NOT EXISTS idx_fetched_activity_entries_per_recipient ON fetched_activity_entries (recipient);
24+
CREATE INDEX IF NOT EXISTS idx_fetched_activity_entries_per_chain_id_out ON fetched_activity_entries (chain_id_out);
25+
CREATE INDEX IF NOT EXISTS idx_fetched_activity_entries_per_chain_id_in ON fetched_activity_entries (chain_id_in);

0 commit comments

Comments
 (0)