Skip to content

Commit 04e93a6

Browse files
committed
Ensure file path is correct during stream restore
Also had to change all references from `path.` to `filepath.` when dealing with files, so that it works properly on Windows. Fixed also lots of tests to defer the shutdown of the server after the removal of the storage, and fixed some config files directories to use the single quote `'` to surround the file path, again to work on Windows. Signed-off-by: Ivan Kozlovic <[email protected]>
1 parent 0cb0f6d commit 04e93a6

23 files changed

+469
-496
lines changed

server/filestore.go

Lines changed: 43 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,6 @@ import (
2929
"io/ioutil"
3030
"net"
3131
"os"
32-
"path"
3332
"path/filepath"
3433
"runtime"
3534
"sort"
@@ -296,8 +295,8 @@ func newFileStoreWithCreated(fcfg FileStoreConfig, cfg StreamConfig, created tim
296295
fs.fip = !fcfg.AsyncFlush
297296

298297
// Check if this is a new setup.
299-
mdir := path.Join(fcfg.StoreDir, msgDir)
300-
odir := path.Join(fcfg.StoreDir, consumerDir)
298+
mdir := filepath.Join(fcfg.StoreDir, msgDir)
299+
odir := filepath.Join(fcfg.StoreDir, consumerDir)
301300
if err := os.MkdirAll(mdir, defaultDirPerms); err != nil {
302301
return nil, fmt.Errorf("could not create message storage directory - %v", err)
303302
}
@@ -321,7 +320,7 @@ func newFileStoreWithCreated(fcfg FileStoreConfig, cfg StreamConfig, created tim
321320
}
322321

323322
// Write our meta data iff does not exist.
324-
meta := path.Join(fcfg.StoreDir, JetStreamMetaFile)
323+
meta := filepath.Join(fcfg.StoreDir, JetStreamMetaFile)
325324
if _, err := os.Stat(meta); err != nil && os.IsNotExist(err) {
326325
if err := fs.writeStreamMeta(); err != nil {
327326
return nil, err
@@ -331,7 +330,7 @@ func newFileStoreWithCreated(fcfg FileStoreConfig, cfg StreamConfig, created tim
331330
// If we expect to be encrypted check that what we are restoring is not plaintext.
332331
// This can happen on snapshot restores or conversions.
333332
if fs.prf != nil {
334-
keyFile := path.Join(fs.fcfg.StoreDir, JetStreamMetaFileKey)
333+
keyFile := filepath.Join(fs.fcfg.StoreDir, JetStreamMetaFileKey)
335334
if _, err := os.Stat(keyFile); err != nil && os.IsNotExist(err) {
336335
if err := fs.writeStreamMeta(); err != nil {
337336
return nil, err
@@ -454,7 +453,7 @@ func (fs *fileStore) writeStreamMeta() error {
454453
return err
455454
}
456455
fs.aek = key
457-
keyFile := path.Join(fs.fcfg.StoreDir, JetStreamMetaFileKey)
456+
keyFile := filepath.Join(fs.fcfg.StoreDir, JetStreamMetaFileKey)
458457
if _, err := os.Stat(keyFile); err != nil && !os.IsNotExist(err) {
459458
return err
460459
}
@@ -463,7 +462,7 @@ func (fs *fileStore) writeStreamMeta() error {
463462
}
464463
}
465464

466-
meta := path.Join(fs.fcfg.StoreDir, JetStreamMetaFile)
465+
meta := filepath.Join(fs.fcfg.StoreDir, JetStreamMetaFile)
467466
if _, err := os.Stat(meta); err != nil && !os.IsNotExist(err) {
468467
return err
469468
}
@@ -484,7 +483,7 @@ func (fs *fileStore) writeStreamMeta() error {
484483
fs.hh.Reset()
485484
fs.hh.Write(b)
486485
checksum := hex.EncodeToString(fs.hh.Sum(nil))
487-
sum := path.Join(fs.fcfg.StoreDir, JetStreamMetaFileSum)
486+
sum := filepath.Join(fs.fcfg.StoreDir, JetStreamMetaFileSum)
488487
if err := ioutil.WriteFile(sum, []byte(checksum), defaultFilePerms); err != nil {
489488
return err
490489
}
@@ -503,10 +502,10 @@ const indexHdrSize = 7*binary.MaxVarintLen64 + hdrLen + checksumSize
503502
func (fs *fileStore) recoverMsgBlock(fi os.FileInfo, index uint64) (*msgBlock, error) {
504503
mb := &msgBlock{fs: fs, index: index, cexp: fs.fcfg.CacheExpire}
505504

506-
mdir := path.Join(fs.fcfg.StoreDir, msgDir)
507-
mb.mfn = path.Join(mdir, fi.Name())
508-
mb.ifn = path.Join(mdir, fmt.Sprintf(indexScan, index))
509-
mb.sfn = path.Join(mdir, fmt.Sprintf(fssScan, index))
505+
mdir := filepath.Join(fs.fcfg.StoreDir, msgDir)
506+
mb.mfn = filepath.Join(mdir, fi.Name())
507+
mb.ifn = filepath.Join(mdir, fmt.Sprintf(indexScan, index))
508+
mb.sfn = filepath.Join(mdir, fmt.Sprintf(fssScan, index))
510509

511510
if mb.hh == nil {
512511
key := sha256.Sum256(fs.hashKeyForBlock(index))
@@ -517,7 +516,7 @@ func (fs *fileStore) recoverMsgBlock(fi os.FileInfo, index uint64) (*msgBlock, e
517516

518517
// Check if encryption is enabled.
519518
if fs.prf != nil {
520-
ekey, err := ioutil.ReadFile(path.Join(mdir, fmt.Sprintf(keyScan, mb.index)))
519+
ekey, err := ioutil.ReadFile(filepath.Join(mdir, fmt.Sprintf(keyScan, mb.index)))
521520
if err != nil {
522521
// We do not seem to have keys even though we should. Could be a plaintext conversion.
523522
// Create the keys and we will double check below.
@@ -863,12 +862,12 @@ func (fs *fileStore) recoverMsgs() error {
863862
defer fs.mu.Unlock()
864863

865864
// Check for any left over purged messages.
866-
pdir := path.Join(fs.fcfg.StoreDir, purgeDir)
865+
pdir := filepath.Join(fs.fcfg.StoreDir, purgeDir)
867866
if _, err := os.Stat(pdir); err == nil {
868867
os.RemoveAll(pdir)
869868
}
870869

871-
mdir := path.Join(fs.fcfg.StoreDir, msgDir)
870+
mdir := filepath.Join(fs.fcfg.StoreDir, msgDir)
872871
fis, err := ioutil.ReadDir(mdir)
873872
if err != nil {
874873
return errNotReadable
@@ -916,21 +915,21 @@ func (fs *fileStore) recoverMsgs() error {
916915

917916
// We had a bug that would leave fss files around during a snapshot.
918917
// Clean them up here if we see them.
919-
if fms, err := filepath.Glob(path.Join(mdir, fssScanAll)); err == nil && len(fms) > 0 {
918+
if fms, err := filepath.Glob(filepath.Join(mdir, fssScanAll)); err == nil && len(fms) > 0 {
920919
for _, fn := range fms {
921920
os.Remove(fn)
922921
}
923922
}
924923
// Same bug for keyfiles but for these we just need to identify orphans.
925-
if kms, err := filepath.Glob(path.Join(mdir, keyScanAll)); err == nil && len(kms) > 0 {
924+
if kms, err := filepath.Glob(filepath.Join(mdir, keyScanAll)); err == nil && len(kms) > 0 {
926925
valid := make(map[uint64]bool)
927926
for _, mb := range fs.blks {
928927
valid[mb.index] = true
929928
}
930929
for _, fn := range kms {
931930
var index uint64
932931
shouldRemove := true
933-
if n, err := fmt.Sscanf(path.Base(fn), keyScan, &index); err == nil && n == 1 && valid[index] {
932+
if n, err := fmt.Sscanf(filepath.Base(fn), keyScan, &index); err == nil && n == 1 && valid[index] {
934933
shouldRemove = false
935934
}
936935
if shouldRemove {
@@ -1516,16 +1515,16 @@ func (fs *fileStore) newMsgBlockForWrite() (*msgBlock, error) {
15161515
}
15171516
mb.hh = hh
15181517

1519-
mdir := path.Join(fs.fcfg.StoreDir, msgDir)
1520-
mb.mfn = path.Join(mdir, fmt.Sprintf(blkScan, mb.index))
1518+
mdir := filepath.Join(fs.fcfg.StoreDir, msgDir)
1519+
mb.mfn = filepath.Join(mdir, fmt.Sprintf(blkScan, mb.index))
15211520
mfd, err := os.OpenFile(mb.mfn, os.O_CREATE|os.O_RDWR, defaultFilePerms)
15221521
if err != nil {
15231522
mb.dirtyCloseWithRemove(true)
15241523
return nil, fmt.Errorf("Error creating msg block file [%q]: %v", mb.mfn, err)
15251524
}
15261525
mb.mfd = mfd
15271526

1528-
mb.ifn = path.Join(mdir, fmt.Sprintf(indexScan, mb.index))
1527+
mb.ifn = filepath.Join(mdir, fmt.Sprintf(indexScan, mb.index))
15291528
ifd, err := os.OpenFile(mb.ifn, os.O_CREATE|os.O_RDWR, defaultFilePerms)
15301529
if err != nil {
15311530
mb.dirtyCloseWithRemove(true)
@@ -1534,7 +1533,7 @@ func (fs *fileStore) newMsgBlockForWrite() (*msgBlock, error) {
15341533
mb.ifd = ifd
15351534

15361535
// For subject based info.
1537-
mb.sfn = path.Join(mdir, fmt.Sprintf(fssScan, mb.index))
1536+
mb.sfn = filepath.Join(mdir, fmt.Sprintf(fssScan, mb.index))
15381537

15391538
// Check if encryption is enabled.
15401539
if fs.prf != nil {
@@ -1576,8 +1575,8 @@ func (fs *fileStore) genEncryptionKeysForBlock(mb *msgBlock) error {
15761575
return err
15771576
}
15781577
mb.aek, mb.bek, mb.seed, mb.nonce = key, bek, seed, encrypted[:key.NonceSize()]
1579-
mdir := path.Join(fs.fcfg.StoreDir, msgDir)
1580-
keyFile := path.Join(mdir, fmt.Sprintf(keyScan, mb.index))
1578+
mdir := filepath.Join(fs.fcfg.StoreDir, msgDir)
1579+
keyFile := filepath.Join(mdir, fmt.Sprintf(keyScan, mb.index))
15811580
if _, err := os.Stat(keyFile); err != nil && !os.IsNotExist(err) {
15821581
return err
15831582
}
@@ -2119,7 +2118,7 @@ func (mb *msgBlock) compact() {
21192118
mb.closeFDsLocked()
21202119

21212120
// We will write to a new file and mv/rename it in case of failure.
2122-
mfn := path.Join(path.Join(mb.fs.fcfg.StoreDir, msgDir), fmt.Sprintf(newScan, mb.index))
2121+
mfn := filepath.Join(filepath.Join(mb.fs.fcfg.StoreDir, msgDir), fmt.Sprintf(newScan, mb.index))
21232122
defer os.Remove(mfn)
21242123
if err := ioutil.WriteFile(mfn, nbuf, defaultFilePerms); err != nil {
21252124
return
@@ -4061,8 +4060,8 @@ func (fs *fileStore) purge(fseq uint64) (uint64, error) {
40614060

40624061
// Move the msgs directory out of the way, will delete out of band.
40634062
// FIXME(dlc) - These can error and we need to change api above to propagate?
4064-
mdir := path.Join(fs.fcfg.StoreDir, msgDir)
4065-
pdir := path.Join(fs.fcfg.StoreDir, purgeDir)
4063+
mdir := filepath.Join(fs.fcfg.StoreDir, msgDir)
4064+
pdir := filepath.Join(fs.fcfg.StoreDir, purgeDir)
40664065
// If purge directory still exists then we need to wait
40674066
// in place and remove since rename would fail.
40684067
if _, err := os.Stat(pdir); err == nil {
@@ -4593,7 +4592,7 @@ func (fs *fileStore) Delete() error {
45934592
}
45944593
fs.Purge()
45954594

4596-
pdir := path.Join(fs.fcfg.StoreDir, purgeDir)
4595+
pdir := filepath.Join(fs.fcfg.StoreDir, purgeDir)
45974596
// If purge directory still exists then we need to wait
45984597
// in place and remove since rename would fail.
45994598
if _, err := os.Stat(pdir); err == nil {
@@ -4820,13 +4819,13 @@ func (fs *fileStore) streamSnapshot(w io.WriteCloser, state *StreamState, includ
48204819
o.mu.Unlock()
48214820

48224821
// Write all the consumer files.
4823-
if writeFile(path.Join(odirPre, JetStreamMetaFile), meta) != nil {
4822+
if writeFile(filepath.Join(odirPre, JetStreamMetaFile), meta) != nil {
48244823
return
48254824
}
4826-
if writeFile(path.Join(odirPre, JetStreamMetaFileSum), sum) != nil {
4825+
if writeFile(filepath.Join(odirPre, JetStreamMetaFileSum), sum) != nil {
48274826
return
48284827
}
4829-
writeFile(path.Join(odirPre, consumerState), state)
4828+
writeFile(filepath.Join(odirPre, consumerState), state)
48304829
}
48314830
}
48324831

@@ -4909,7 +4908,7 @@ func (fs *fileStore) ConsumerStore(name string, cfg *ConsumerConfig) (ConsumerSt
49094908
if cfg == nil || name == _EMPTY_ {
49104909
return nil, fmt.Errorf("bad consumer config")
49114910
}
4912-
odir := path.Join(fs.fcfg.StoreDir, consumerDir, name)
4911+
odir := filepath.Join(fs.fcfg.StoreDir, consumerDir, name)
49134912
if err := os.MkdirAll(odir, defaultDirPerms); err != nil {
49144913
return nil, fmt.Errorf("could not create consumer directory - %v", err)
49154914
}
@@ -4920,7 +4919,7 @@ func (fs *fileStore) ConsumerStore(name string, cfg *ConsumerConfig) (ConsumerSt
49204919
prf: fs.prf,
49214920
name: name,
49224921
odir: odir,
4923-
ifn: path.Join(odir, consumerState),
4922+
ifn: filepath.Join(odir, consumerState),
49244923
}
49254924
key := sha256.Sum256([]byte(fs.cfg.Name + "/" + name))
49264925
hh, err := highwayhash.New64(key[:])
@@ -4931,7 +4930,7 @@ func (fs *fileStore) ConsumerStore(name string, cfg *ConsumerConfig) (ConsumerSt
49314930

49324931
// Check for encryption.
49334932
if o.prf != nil {
4934-
if ekey, err := ioutil.ReadFile(path.Join(odir, JetStreamMetaFileKey)); err == nil {
4933+
if ekey, err := ioutil.ReadFile(filepath.Join(odir, JetStreamMetaFileKey)); err == nil {
49354934
// Recover key encryption key.
49364935
rb, err := fs.prf([]byte(fs.cfg.Name + tsep + o.name))
49374936
if err != nil {
@@ -4953,7 +4952,7 @@ func (fs *fileStore) ConsumerStore(name string, cfg *ConsumerConfig) (ConsumerSt
49534952
}
49544953

49554954
// Write our meta data iff does not exist.
4956-
meta := path.Join(odir, JetStreamMetaFile)
4955+
meta := filepath.Join(odir, JetStreamMetaFile)
49574956
if _, err := os.Stat(meta); err != nil && os.IsNotExist(err) {
49584957
csi.Created = time.Now().UTC()
49594958
if err := o.writeConsumerMeta(); err != nil {
@@ -4964,7 +4963,7 @@ func (fs *fileStore) ConsumerStore(name string, cfg *ConsumerConfig) (ConsumerSt
49644963
// If we expect to be encrypted check that what we are restoring is not plaintext.
49654964
// This can happen on snapshot restores or conversions.
49664965
if o.prf != nil {
4967-
keyFile := path.Join(odir, JetStreamMetaFileKey)
4966+
keyFile := filepath.Join(odir, JetStreamMetaFileKey)
49684967
if _, err := os.Stat(keyFile); err != nil && os.IsNotExist(err) {
49694968
if err := o.writeConsumerMeta(); err != nil {
49704969
return nil, err
@@ -5418,7 +5417,7 @@ func (o *consumerFileStore) updateConfig(cfg ConsumerConfig) error {
54185417
// Write out the consumer meta data, i.e. state.
54195418
// Lock should be held.
54205419
func (cfs *consumerFileStore) writeConsumerMeta() error {
5421-
meta := path.Join(cfs.odir, JetStreamMetaFile)
5420+
meta := filepath.Join(cfs.odir, JetStreamMetaFile)
54225421
if _, err := os.Stat(meta); err != nil && !os.IsNotExist(err) {
54235422
return err
54245423
}
@@ -5430,7 +5429,7 @@ func (cfs *consumerFileStore) writeConsumerMeta() error {
54305429
return err
54315430
}
54325431
cfs.aek = key
5433-
keyFile := path.Join(cfs.odir, JetStreamMetaFileKey)
5432+
keyFile := filepath.Join(cfs.odir, JetStreamMetaFileKey)
54345433
if _, err := os.Stat(keyFile); err != nil && !os.IsNotExist(err) {
54355434
return err
54365435
}
@@ -5456,7 +5455,7 @@ func (cfs *consumerFileStore) writeConsumerMeta() error {
54565455
cfs.hh.Reset()
54575456
cfs.hh.Write(b)
54585457
checksum := hex.EncodeToString(cfs.hh.Sum(nil))
5459-
sum := path.Join(cfs.odir, JetStreamMetaFileSum)
5458+
sum := filepath.Join(cfs.odir, JetStreamMetaFileSum)
54605459
if err := ioutil.WriteFile(sum, []byte(checksum), defaultFilePerms); err != nil {
54615460
return err
54625461
}
@@ -5787,7 +5786,7 @@ type templateFileStore struct {
57875786
}
57885787

57895788
func newTemplateFileStore(storeDir string) *templateFileStore {
5790-
tdir := path.Join(storeDir, tmplsDir)
5789+
tdir := filepath.Join(storeDir, tmplsDir)
57915790
key := sha256.Sum256([]byte("templates"))
57925791
hh, err := highwayhash.New64(key[:])
57935792
if err != nil {
@@ -5797,11 +5796,11 @@ func newTemplateFileStore(storeDir string) *templateFileStore {
57975796
}
57985797

57995798
func (ts *templateFileStore) Store(t *streamTemplate) error {
5800-
dir := path.Join(ts.dir, t.Name)
5799+
dir := filepath.Join(ts.dir, t.Name)
58015800
if err := os.MkdirAll(dir, defaultDirPerms); err != nil {
58025801
return fmt.Errorf("could not create templates storage directory for %q- %v", t.Name, err)
58035802
}
5804-
meta := path.Join(dir, JetStreamMetaFile)
5803+
meta := filepath.Join(dir, JetStreamMetaFile)
58055804
if _, err := os.Stat(meta); (err != nil && !os.IsNotExist(err)) || err == nil {
58065805
return err
58075806
}
@@ -5818,13 +5817,13 @@ func (ts *templateFileStore) Store(t *streamTemplate) error {
58185817
ts.hh.Reset()
58195818
ts.hh.Write(b)
58205819
checksum := hex.EncodeToString(ts.hh.Sum(nil))
5821-
sum := path.Join(dir, JetStreamMetaFileSum)
5820+
sum := filepath.Join(dir, JetStreamMetaFileSum)
58225821
if err := ioutil.WriteFile(sum, []byte(checksum), defaultFilePerms); err != nil {
58235822
return err
58245823
}
58255824
return nil
58265825
}
58275826

58285827
func (ts *templateFileStore) Delete(t *streamTemplate) error {
5829-
return os.RemoveAll(path.Join(ts.dir, t.Name))
5828+
return os.RemoveAll(filepath.Join(ts.dir, t.Name))
58305829
}

0 commit comments

Comments
 (0)