@@ -4,39 +4,87 @@ import (
44 "context"
55 "io"
66 "os"
7+ "strconv"
78
9+ "github.com/tuihub/librarian/app/sephirah/internal/model/modelbinah"
810 "github.com/tuihub/librarian/internal/lib/libauth"
11+ "github.com/tuihub/librarian/internal/model"
912 mapper "github.com/tuihub/protos/pkg/librarian/mapper/v1"
1013 porter "github.com/tuihub/protos/pkg/librarian/porter/v1"
1114 searcher "github.com/tuihub/protos/pkg/librarian/searcher/v1"
1215 pb "github.com/tuihub/protos/pkg/librarian/sephirah/v1"
1316
1417 "github.com/go-kratos/kratos/v2/errors"
18+ "github.com/google/uuid"
1519)
1620
1721type UploadFile struct {
18- callback UploadCallbackFunc
22+ id model.InternalID
23+ porter porter.LibrarianPorterServiceClient
24+ callback modelbinah.UploadCallbackFunc
1925 file * os.File
2026 Writer io.Writer
2127}
2228
23- func (f * UploadFile ) Finish () error {
24- return f .callback (f )
29+ func (f * UploadFile ) Finish (ctx context.Context ) error {
30+ cli , err := f .porter .PushData (ctx )
31+ if err != nil {
32+ return err
33+ }
34+ err = cli .Send (& porter.PushDataRequest {
35+ Content : & porter.PushDataRequest_Metadata {
36+ Metadata : & porter.PushDataRequest_DataMeta {
37+ Source : porter .DataSource_DATA_SOURCE_INTERNAL_DEFAULT ,
38+ ContentId : strconv .FormatInt (int64 (f .id ), 10 ),
39+ },
40+ },
41+ })
42+ if err != nil {
43+ return err
44+ }
45+ buf := make ([]byte , 16 << 10 ) //nolint:gomnd //TODO
46+ for {
47+ _ , err = f .file .Read (buf )
48+ if errors .Is (err , io .EOF ) {
49+ _ , err = cli .CloseAndRecv ()
50+ if err != nil {
51+ return err
52+ }
53+ break
54+ }
55+ if err != nil {
56+ return err
57+ }
58+ err = cli .Send (& porter.PushDataRequest {
59+ Content : & porter.PushDataRequest_Data {
60+ Data : buf ,
61+ },
62+ })
63+ if err != nil {
64+ return err
65+ }
66+ }
67+ return f .callback ()
2568}
2669
2770type BinahRepo interface {
2871}
2972
3073type Binah struct {
31- callback CallbackControlBlock
74+ callback * modelbinah. ControlBlock
3275 auth * libauth.Auth
3376 mapper mapper.LibrarianMapperServiceClient
3477 porter porter.LibrarianPorterServiceClient
3578 searcher searcher.LibrarianSearcherServiceClient
3679}
3780
38- func NewBinah (callback CallbackControlBlock , auth * libauth.Auth , mClient mapper.LibrarianMapperServiceClient ,
39- pClient porter.LibrarianPorterServiceClient , sClient searcher.LibrarianSearcherServiceClient ) * Binah {
81+ func NewBinah (
82+ callback * modelbinah.ControlBlock ,
83+ auth * libauth.Auth ,
84+ mClient mapper.LibrarianMapperServiceClient ,
85+ pClient porter.LibrarianPorterServiceClient ,
86+ sClient searcher.LibrarianSearcherServiceClient ,
87+ ) * Binah {
4088 return & Binah {
4189 callback : callback ,
4290 auth : auth ,
@@ -46,25 +94,31 @@ func NewBinah(callback CallbackControlBlock, auth *libauth.Auth, mClient mapper.
4694 }
4795}
4896
49- func (b * Binah ) getUploadCallback (id UploadCallbackID ) UploadCallbackFunc {
50- f , exist := b .callback .uploadCallbackMap [id ]
51- if exist {
52- return f
53- }
54- return emptyUploadCallback
97+ func NewControlBlock (a * libauth.Auth ) * modelbinah.ControlBlock {
98+ return modelbinah .NewControlBlock (a )
5599}
56100
57101func (b * Binah ) NewUploadFile (ctx context.Context ) (* UploadFile , * errors.Error ) {
58102 claims , exist := libauth .FromContext (ctx )
59103 if ! exist || claims == nil || claims .TransferMetadata == nil {
60104 return nil , pb .ErrorErrorReasonUnauthorized ("token required" )
61105 }
62- f , err := os .CreateTemp ("" , claims .TransferMetadata .Name )
106+ callback , err := b .callback .GetUploadCallback (ctx )
107+ if err != nil {
108+ return nil , pb .ErrorErrorReasonUnspecified ("%s" , err .Error ())
109+ }
110+ meta , err := b .callback .GetUploadFileMetadata (ctx )
111+ if err != nil {
112+ return nil , pb .ErrorErrorReasonUnspecified ("%s" , err .Error ())
113+ }
114+ f , err := os .CreateTemp ("" , uuid .NewString ())
63115 if err != nil {
64116 return nil , pb .ErrorErrorReasonUnspecified ("create temp file failed" )
65117 }
66118 return & UploadFile {
67- callback : b .getUploadCallback (UploadCallbackID (claims .TransferMetadata .CallBack )),
119+ id : meta .ID ,
120+ porter : b .porter ,
121+ callback : callback ,
68122 file : f ,
69123 Writer : f ,
70124 }, nil
0 commit comments