Skip to content

Commit d002098

Browse files
authored
Merge pull request #417 from pixlise/development
Release 4.53.0
2 parents a01bba8 + 271971b commit d002098

File tree

111 files changed

+422
-209
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

111 files changed

+422
-209
lines changed

api/config/config.go

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,13 @@ type APIConfig struct {
101101
MaxFileCacheAgeSec uint
102102
MaxFileCacheSizeBytes uint
103103

104+
// Max time we allow memoised item to exist in DB and not be retrieved.
105+
// If it hasn't been accessed in this many seconds, consider it stale & delete it!
106+
MaxUnretrievedMemoisationAgeSec uint
107+
108+
// How often we run memoisation GC
109+
MemoisationGCIntervalSec uint
110+
104111
ImportJobMaxTimeSec uint32
105112
PIQUANTJobMaxTimeSec uint32
106113

@@ -213,6 +220,14 @@ func Init() (APIConfig, error) {
213220
cfg.PIQUANTJobMaxTimeSec = uint32(15 * 60)
214221
}
215222

223+
if cfg.MaxUnretrievedMemoisationAgeSec <= 0 {
224+
cfg.MaxUnretrievedMemoisationAgeSec = 86400 * 30
225+
}
226+
227+
if cfg.MemoisationGCIntervalSec <= 0 {
228+
cfg.MemoisationGCIntervalSec = 3600
229+
}
230+
216231
cfg.KubeConfig = *kubeconfig
217232

218233
return cfg, nil

api/dataimport/internal/converters/jplbreadboard/spectra_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ func Example_getMSASeqNo() {
4444

4545
func Example_getSpectraFiles() {
4646
files := []string{"../something/file.txt", "../something/Normal_B_1_2_3.msa", "../something/BulkSum_B_1_2_3.msa", "../something/Another_B_1_2_3.msa", "../something/Normal_B_1_2.jpg", "Normal_P_1_2_3.msa"}
47-
f, l := getSpectraFiles(files, true, &logger.StdOutLogger{})
47+
f, l := getSpectraFiles(files, true, &logger.StdErrLogger{})
4848

4949
for _, v := range f {
5050
fmt.Printf(v + "\n")

api/dataimport/internal/converters/pixlem/import.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -292,6 +292,18 @@ func importEMData(creatorId string, rtt string, beamLocPath string, hkPath strin
292292
}
293293
}
294294

295+
// Check if there are beam locations for images that we don't have
296+
for _, pmc := range ijPMCs {
297+
if _, ok := contextImgsPerPMC[pmc]; !ok {
298+
logger.Infof("WARNING: Missing image for beam location PMC: %v. Beam ij's for this won't be stored.", pmc)
299+
300+
// Delete all stored PMCs for this
301+
for _, beamInfo := range beamLookup {
302+
delete(beamInfo.IJ, pmc)
303+
}
304+
}
305+
}
306+
295307
hkData, err := importerutils.ReadHousekeepingFile(hkPath, 0, logger)
296308
if err != nil {
297309
return nil, err

api/endpoints/memoisation.go

Lines changed: 32 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -62,25 +62,45 @@ func GetMemoise(params apiRouter.ApiHandlerGenericParams) error {
6262
return err
6363
}
6464

65-
// Update last accessed time here
66-
timestamp := uint32(params.Svcs.TimeStamper.GetTimeNowSec())
67-
if timestamp != item.LastReadTimeUnixSec {
68-
update := bson.D{{Key: "$set", Value: bson.D{{Key: "lastreadtimeunixsec", Value: timestamp}}}}
69-
updResult, err := coll.UpdateOne(ctx, filter, update, options.Update())
65+
now := uint32(params.Svcs.TimeStamper.GetTimeNowSec())
66+
67+
// Check if this is passed the max age we allow for an item to live in our cache
68+
if item.LastReadTimeUnixSec < now-uint32(params.Svcs.Config.MaxUnretrievedMemoisationAgeSec) {
69+
// It's too old, delete & don't return
70+
params.Svcs.Log.Infof("Retrieved memoised item: %v that hasn't been accessed in %v sec. Deleting.", key, now-item.LastReadTimeUnixSec)
71+
72+
delResult, err := coll.DeleteOne(ctx, filter, options.Delete())
7073
if err != nil {
7174
// Don't error out on this, but do notify
72-
params.Svcs.Log.Errorf("Failed to update last read time stamp for memoised item: %v. Error: %v", key, err)
73-
}
75+
params.Svcs.Log.Errorf("Failed to delete outdated memoised item: %v. Error: %v", key, err)
76+
} else {
77+
if delResult.DeletedCount != 1 {
78+
params.Svcs.Log.Errorf("Memoised item delete had unexpected counts %+v key: %v", delResult, key)
79+
}
7480

75-
if updResult.ModifiedCount != 1 {
76-
params.Svcs.Log.Errorf("Memoised item timestamp update had unexpected counts %+v key: %v", updResult, key)
81+
return errorwithstatus.MakeNotFoundError(key)
82+
}
83+
} else {
84+
// Update last accessed time here
85+
if now != item.LastReadTimeUnixSec {
86+
update := bson.D{{Key: "$set", Value: bson.D{{Key: "lastreadtimeunixsec", Value: now}}}}
87+
updResult, err := coll.UpdateOne(ctx, filter, update, options.Update())
88+
if err != nil {
89+
// Don't error out on this, but do notify
90+
params.Svcs.Log.Errorf("Failed to update last read time stamp for memoised item: %v. Error: %v", key, err)
91+
}
92+
93+
if updResult.ModifiedCount != 1 {
94+
params.Svcs.Log.Errorf("Memoised item timestamp update had unexpected counts %+v key: %v", updResult, key)
95+
}
96+
97+
// Also set it in the item we're replying with
98+
item.LastReadTimeUnixSec = now
7799
}
78-
79-
// Also set it in the item we're replying with
80-
item.LastReadTimeUnixSec = timestamp
81100
}
82101

83102
utils.SendProtoBinary(params.Writer, item)
103+
84104
return nil
85105
}
86106

api/endpoints/middlewareAuth_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -99,7 +99,7 @@ func Example_isMatch() {
9999

100100
func Example_getPermissionsForURI() {
101101
var a AuthMiddleWareData
102-
a.Logger = &logger.StdOutLogger{}
102+
a.Logger = &logger.StdErrLogger{}
103103
a.RoutePermissionsRequired = map[string]string{
104104
"GET/the/{id}/something": "root3",
105105
"GET/the/path": "root1",
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
package memoisation
2+
3+
import (
4+
"context"
5+
"time"
6+
7+
"github.com/pixlise/core/v4/api/dbCollections"
8+
"github.com/pixlise/core/v4/core/logger"
9+
"github.com/pixlise/core/v4/core/timestamper"
10+
"go.mongodb.org/mongo-driver/bson"
11+
"go.mongodb.org/mongo-driver/mongo"
12+
"go.mongodb.org/mongo-driver/mongo/options"
13+
)
14+
15+
func RunMemoisationGarbageCollector(intervalSec uint32, oldestAllowedSec uint32, mongoDB *mongo.Database, ts timestamper.ITimeStamper, log logger.ILogger) {
16+
for range time.Tick(time.Second * time.Duration(intervalSec)) {
17+
collectGarbage(mongoDB, oldestAllowedSec, ts, log)
18+
}
19+
}
20+
21+
func collectGarbage(mongoDB *mongo.Database, oldestAllowedSec uint32, ts timestamper.ITimeStamper, log logger.ILogger) {
22+
log.Infof("Memoisation GC starting...")
23+
24+
oldestAllowedUnixSec := ts.GetTimeNowSec() - int64(oldestAllowedSec)
25+
26+
ctx := context.TODO()
27+
opts := options.Delete()
28+
filter := bson.M{"lastreadtimeunixsec": bson.M{"$lt": oldestAllowedUnixSec}}
29+
coll := mongoDB.Collection(dbCollections.MemoisedItemsName)
30+
31+
delResult, err := coll.DeleteMany(ctx, filter, opts)
32+
if err != nil {
33+
log.Errorf("Memoisation GC delete error: %v", err)
34+
}
35+
36+
log.Infof("Memoisation GC deleted %v items", delResult.DeletedCount)
37+
}
Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
package memoisation
2+
3+
import (
4+
"context"
5+
"fmt"
6+
7+
"github.com/pixlise/core/v4/api/dbCollections"
8+
"github.com/pixlise/core/v4/core/logger"
9+
"github.com/pixlise/core/v4/core/timestamper"
10+
"github.com/pixlise/core/v4/core/wstestlib"
11+
protos "github.com/pixlise/core/v4/generated-protos"
12+
"go.mongodb.org/mongo-driver/bson"
13+
"go.mongodb.org/mongo-driver/mongo/options"
14+
)
15+
16+
func Example_CollectGarbage() {
17+
db := wstestlib.GetDB()
18+
ctx := context.TODO()
19+
coll := db.Collection(dbCollections.MemoisedItemsName)
20+
21+
// Insert an item that's too old, and one that's newly accessed
22+
ts := &timestamper.MockTimeNowStamper{
23+
QueuedTimeStamps: []int64{1234567890, 1234567890, 1234567890, 1234567890},
24+
}
25+
26+
now := uint32(ts.GetTimeNowSec())
27+
maxAge := uint32(100)
28+
29+
item := &protos.MemoisedItem{
30+
Key: "key123",
31+
MemoTimeUnixSec: now - maxAge - 50,
32+
Data: []byte{1, 3, 5, 7},
33+
ScanId: "scan333",
34+
DataSize: uint32(4),
35+
LastReadTimeUnixSec: now - maxAge - 10,
36+
}
37+
38+
opt := options.Update().SetUpsert(true)
39+
_, err := coll.UpdateByID(ctx, item.Key, bson.D{{Key: "$set", Value: item}}, opt)
40+
fmt.Printf("Insert 1: %v\n", err)
41+
42+
item = &protos.MemoisedItem{
43+
Key: "key456",
44+
MemoTimeUnixSec: now - maxAge - 55,
45+
Data: []byte{2, 4, 6, 8, 10},
46+
ScanId: "scan222",
47+
DataSize: uint32(5),
48+
LastReadTimeUnixSec: now - 5,
49+
}
50+
_, err = coll.UpdateByID(ctx, item.Key, bson.D{{Key: "$set", Value: item}}, opt)
51+
fmt.Printf("Insert 2: %v\n", err)
52+
53+
log := &logger.StdOutLogger{}
54+
collectGarbage(db, maxAge, ts, log)
55+
56+
// Output:
57+
// Insert 1: <nil>
58+
// Insert 2: <nil>
59+
// INFO: Memoisation GC starting...
60+
// INFO: Memoisation GC deleted 1 items
61+
}

api/quantification/converter_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -140,7 +140,7 @@ func readDatasetFile(path string) (*protos.Experiment, error) {
140140
}
141141

142142
func Example_matchPMCsWithDataset() {
143-
l := &logger.StdOutLogger{}
143+
l := &logger.StdErrLogger{}
144144
data := csvData{[]string{"X", "Y", "Z", "filename", "Ca_%"}, [][]string{[]string{"1", "0.40", "0", "Roastt_Laguna_Salinas_28kV_230uA_03_03_2020_111.msa", "4.5"}}}
145145

146146
exp, err := readDatasetFile("./testdata/LagunaSalinasdataset.bin")

core/logger/log-stderr.go

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,55 @@
1+
// Licensed to NASA JPL under one or more contributor
2+
// license agreements. See the NOTICE file distributed with
3+
// this work for additional information regarding copyright
4+
// ownership. NASA JPL licenses this file to you under
5+
// the Apache License, Version 2.0 (the "License"); you may
6+
// not use this file except in compliance with the License.
7+
// You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
package logger
19+
20+
import (
21+
"fmt"
22+
"log"
23+
)
24+
25+
// StdErrLogger - For mocking out in tests
26+
type StdErrLogger struct {
27+
logLevel LogLevel
28+
}
29+
30+
func (l *StdErrLogger) Printf(level LogLevel, format string, a ...interface{}) {
31+
txt := logLevelPrefix[level] + ": " + fmt.Sprintf(format, a...)
32+
log.Println(txt)
33+
}
34+
func (l *StdErrLogger) Debugf(format string, a ...interface{}) {
35+
if l.logLevel <= LogDebug {
36+
l.Printf(LogDebug, format, a...)
37+
}
38+
}
39+
func (l *StdErrLogger) Infof(format string, a ...interface{}) {
40+
if l.logLevel <= LogInfo {
41+
l.Printf(LogInfo, format, a...)
42+
}
43+
}
44+
func (l *StdErrLogger) Errorf(format string, a ...interface{}) {
45+
l.Printf(LogError, format, a...)
46+
}
47+
48+
func (l *StdErrLogger) SetLogLevel(level LogLevel) {
49+
l.logLevel = level
50+
}
51+
func (l *StdErrLogger) GetLogLevel() LogLevel {
52+
return l.logLevel
53+
}
54+
func (l *StdErrLogger) Close() {
55+
}

core/logger/log-stdout.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@ package logger
1919

2020
import (
2121
"fmt"
22-
"log"
2322
)
2423

2524
// StdOutLogger - For mocking out in tests
@@ -29,7 +28,7 @@ type StdOutLogger struct {
2928

3029
func (l *StdOutLogger) Printf(level LogLevel, format string, a ...interface{}) {
3130
txt := logLevelPrefix[level] + ": " + fmt.Sprintf(format, a...)
32-
log.Println(txt)
31+
fmt.Println(txt)
3332
}
3433
func (l *StdOutLogger) Debugf(format string, a ...interface{}) {
3534
if l.logLevel <= LogDebug {

0 commit comments

Comments
 (0)