Skip to content

Commit ddf466b

Browse files
committed
feat: processors for enriching documents
1 parent dbbb299 commit ddf466b

File tree

10 files changed

+1311
-135
lines changed

10 files changed

+1311
-135
lines changed

coco.yml

Lines changed: 19 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -323,14 +323,27 @@ pipeline:
323323
group: enriched_documents
324324
fetch_max_messages: 10
325325
processor:
326+
- read_local_file_content: {}
327+
- file_extraction:
328+
tika_endpoint: "http://127.0.0.1:9998"
329+
timeout_in_seconds: 180
330+
chunk_size: 7000
326331
- document_summarization:
327-
model: $[[env.ENRICHMENT_MODEL]]
328-
input_queue: "indexing_documents"
329-
min_input_document_length: 500
332+
model_provider: deepseek
333+
model: deepseek-chat
334+
model_context_length: 128000
335+
max_summary_length: 300
336+
- document_embedding:
337+
model_provider: qianwen
338+
model: text-embedding-v4
339+
# Dimension of the vectors generated by your embedding model
340+
embedding_dimension: 1024
341+
# Chunk size in characters, it is recommended to use a value
342+
# lower than you embedding model's input limit, e.g., if the
343+
# limit is 8192 tokens, then we can set chunk size to 7000.
344+
chunk_size: 7000
330345
output_queue:
331346
name: "enriched_documents"
332-
label:
333-
tag: "enriched"
334347

335348
- name: merge_documents
336349
auto_start: true
@@ -362,6 +375,7 @@ pipeline:
362375
queues:
363376
type: indexing_merge
364377
tag: "merged"
378+
365379
- name: connector_dispatcher
366380
auto_start: true
367381
keep_running: true

core/document.go

Lines changed: 79 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
package core
66

77
import (
8+
"fmt"
89
"strings"
910
"time"
1011
)
@@ -41,8 +42,9 @@ type Document struct {
4142
Title string `json:"title,omitempty" elastic_mapping:"title:{type:text,copy_to:combined_fulltext,fields:{keyword: {type: keyword}, pinyin: {type: text, analyzer: pinyin_analyzer}}}"` // Document title
4243
Summary string `json:"summary,omitempty" elastic_mapping:"summary:{type:text,copy_to:combined_fulltext}"` // Brief summary or description of the document
4344

44-
Lang string `json:"lang,omitempty" elastic_mapping:"lang:{type:keyword,copy_to:combined_fulltext}"` // Language code (e.g., "en", "fr")
45-
Content string `json:"content,omitempty" elastic_mapping:"content:{type:text,copy_to:combined_fulltext}"` // Document content for full-text indexing
45+
Lang string `json:"lang,omitempty" elastic_mapping:"lang:{type:keyword,copy_to:combined_fulltext}"` // Language code (e.g., "en", "fr")
46+
Content string `json:"content,omitempty" elastic_mapping:"content:{type:text,copy_to:combined_fulltext}"` // Document content for full-text indexing
47+
Chunks []DocumentChunk `json:"document_chunk,omitempty" elastic_mapping:"document_chunk:{type:nested}"`
4648

4749
Icon string `json:"icon,omitempty" elastic_mapping:"icon:{enabled:false}"` // Icon Key, need work with datasource's assets to get the icon url, if it is a full url, then use it directly
4850
Thumbnail string `json:"thumbnail,omitempty" elastic_mapping:"thumbnail:{enabled:false}"` // Thumbnail image URL, for preview purposes
@@ -113,3 +115,78 @@ type UserInfo struct {
113115
UserName string `json:"username,omitempty" elastic_mapping:"username:{type:keyword,copy_to:combined_fulltext}"` // Login of the user
114116
UserID string `json:"userid,omitempty" elastic_mapping:"userid:{type:keyword,copy_to:combined_fulltext}"` // Unique identifier for the user
115117
}
118+
119+
type DocumentChunk struct {
120+
Range ChunkRange `json:"range" elastic_mapping:"range:{type:object}"`
121+
Text string `json:"text" elastic_mapping:"text:{type:text}"`
122+
Embedding Embedding `json:"embedding" elastic_mapping:"embedding:{type:object}"`
123+
}
124+
125+
// A `Embedding` stores a chunk's embedding.
126+
//
127+
// Only 1 field will be used, depending on the chosen embedding dimension, see
128+
// the `Dimension` field above.
129+
//
130+
// Having so many `EmbeddingXxx` fields is embarrasing, but we have no choice
131+
// since vector dimension is part of the type information and elastic mapping
132+
// has to be static.
133+
//
134+
// If you add or remove fields, please update variable "SupportedEmbeddingDimensions"
135+
// as well.
136+
type Embedding struct {
137+
Embedding128 []float32 `json:"embedding128,omitempty" elastic_mapping:"embedding128:{type:knn_dense_float_vector,knn:{dims:128,model:lsh,similarity:cosine,L:99,k:1}}"`
138+
Embedding256 []float32 `json:"embedding256,omitempty" elastic_mapping:"embedding256:{type:knn_dense_float_vector,knn:{dims:256,model:lsh,similarity:cosine,L:99,k:1}}"`
139+
Embedding384 []float32 `json:"embedding384,omitempty" elastic_mapping:"embedding384:{type:knn_dense_float_vector,knn:{dims:384,model:lsh,similarity:cosine,L:99,k:1}}"`
140+
Embedding512 []float32 `json:"embedding512,omitempty" elastic_mapping:"embedding512:{type:knn_dense_float_vector,knn:{dims:512,model:lsh,similarity:cosine,L:99,k:1}}"`
141+
Embedding768 []float32 `json:"embedding768,omitempty" elastic_mapping:"embedding768:{type:knn_dense_float_vector,knn:{dims:768,model:lsh,similarity:cosine,L:99,k:1}}"`
142+
Embedding1024 []float32 `json:"embedding1024,omitempty" elastic_mapping:"embedding1024:{type:knn_dense_float_vector,knn:{dims:1024,model:lsh,similarity:cosine,L:99,k:1}}"`
143+
Embedding1536 []float32 `json:"embedding1536,omitempty" elastic_mapping:"embedding1536:{type:knn_dense_float_vector,knn:{dims:1536,model:lsh,similarity:cosine,L:99,k:1}}"`
144+
Embedding2048 []float32 `json:"embedding2048,omitempty" elastic_mapping:"embedding2048:{type:knn_dense_float_vector,knn:{dims:2048,model:lsh,similarity:cosine,L:99,k:1}}"`
145+
Embedding2560 []float32 `json:"embedding2560,omitempty" elastic_mapping:"embedding2560:{type:knn_dense_float_vector,knn:{dims:2560,model:lsh,similarity:cosine,L:99,k:1}}"`
146+
Embedding4096 []float32 `json:"embedding4096,omitempty" elastic_mapping:"embedding4096:{type:knn_dense_float_vector,knn:{dims:4096,model:lsh,similarity:cosine,L:99,k:1}}"`
147+
}
148+
149+
// Set the actual value of this "Embedding"
150+
func (e *Embedding) SetValue(embedding []float32) {
151+
dimension := len(embedding)
152+
switch dimension {
153+
case 128:
154+
e.Embedding128 = embedding
155+
case 256:
156+
e.Embedding256 = embedding
157+
case 384:
158+
e.Embedding384 = embedding
159+
case 512:
160+
e.Embedding512 = embedding
161+
case 768:
162+
e.Embedding768 = embedding
163+
case 1024:
164+
e.Embedding1024 = embedding
165+
case 1536:
166+
e.Embedding1536 = embedding
167+
case 2048:
168+
e.Embedding2048 = embedding
169+
case 2560:
170+
e.Embedding2560 = embedding
171+
case 4096:
172+
e.Embedding4096 = embedding
173+
default:
174+
panic(fmt.Sprintf("embedding's dimension is invalid, we accept %v", SupportedEmbeddingDimensions))
175+
}
176+
}
177+
178+
// Embedding dimensions supported by us, it should be kept sync with the
179+
// "EmbeddingXxx" fields of struct Embedding
180+
var SupportedEmbeddingDimensions = []int32{128, 256, 384, 512, 768, 1024, 1536, 2048, 2560, 4096}
181+
182+
// Range of a chunk.
183+
//
184+
// A chunk contains roughly the same amount of tokens, say 8192 tokens. And
185+
// thus, a chunk can span many pages if these pages are small, or it is only
186+
// part of a page if the page is big.
187+
type ChunkRange struct {
188+
// Start page of this chunk.
189+
Start int `json:"start" elastic_mapping:"start:{type:integer}"`
190+
// End page of this chuhk. This is **inclusive**.
191+
End int `json:"end" elastic_mapping:"end:{type:integer}"`
192+
}

modules/assistant/langchain/llm.go

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,15 +5,16 @@
55
package langchain
66

77
import (
8+
"net/http"
9+
"net/http/httputil"
10+
811
log "github.com/cihub/seelog"
912
"github.com/tmc/langchaingo/llms"
1013
"github.com/tmc/langchaingo/llms/ollama"
1114
"github.com/tmc/langchaingo/llms/openai"
1215
"infini.sh/coco/core"
1316
"infini.sh/coco/modules/common"
1417
"infini.sh/framework/core/global"
15-
"net/http"
16-
"net/http/httputil"
1718
)
1819

1920
type LoggingRoundTripper struct {
@@ -61,12 +62,14 @@ func GetLLM(endpoint, apiType, model, token string, keepalive string) llms.Model
6162
openai.WithToken(token),
6263
openai.WithBaseURL(endpoint),
6364
openai.WithModel(model),
65+
openai.WithEmbeddingModel(model),
6466
)
6567
} else {
6668
llm, err = openai.New(
6769
openai.WithToken(token),
6870
openai.WithBaseURL(endpoint),
6971
openai.WithModel(model),
72+
openai.WithEmbeddingModel(model),
7073
)
7174
}
7275

modules/attachment/attachment.go

Lines changed: 30 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -7,15 +7,16 @@ package attachment
77
import (
88
"errors"
99
"fmt"
10-
"infini.sh/coco/core"
11-
"infini.sh/framework/core/elastic"
1210
"io"
1311
"mime/multipart"
1412
"net/http"
1513
"path/filepath"
1614
"strings"
1715
"time"
1816

17+
"infini.sh/coco/core"
18+
"infini.sh/framework/core/elastic"
19+
1920
log "github.com/cihub/seelog"
2021
httprouter "infini.sh/framework/core/api/router"
2122
"infini.sh/framework/core/kv"
@@ -53,7 +54,7 @@ func (h APIHandler) uploadAttachment(w http.ResponseWriter, r *http.Request, ps
5354
return
5455
}
5556
// Upload to S3
56-
if fileID, err := uploadToBlobStore(ctx, file, fileHeader.Filename); err != nil {
57+
if fileID, err := UploadToBlobStore(ctx, "", file, fileHeader.Filename, "", false); err != nil {
5758
http.Error(w, err.Error(), http.StatusInternalServerError)
5859
return
5960
} else {
@@ -241,7 +242,18 @@ func getMimeType(file multipart.File) (string, error) {
241242
return mimeType, nil
242243
}
243244

244-
func uploadToBlobStore(ctx *orm.Context, file multipart.File, fileName string) (string, error) {
245+
// Helper function to upload the attachment specified by [file] to the
246+
// blob store.
247+
//
248+
// Arguments:
249+
//
250+
// - If [fileID] is not an empty string, it will be used as the file ID.
251+
// Otherwise, a random ID will be created and used.
252+
// - If [ownerID] is not empty, the created attached will set the owner to it.
253+
// Otherwise, owner information will be extracted from cotnext [ctx].
254+
// - [replaceIfExists]: If this is true and there is already an attachment with
255+
// the same file ID eixsts, replace it.
256+
func UploadToBlobStore(ctx *orm.Context, fileID string, file multipart.File, fileName string, ownerID string, replaceIfExists bool) (string, error) {
245257
defer func() {
246258
_ = file.Close()
247259
}()
@@ -252,7 +264,9 @@ func uploadToBlobStore(ctx *orm.Context, file multipart.File, fileName string) (
252264
return "", fmt.Errorf("failed to read file %s: %v", fileName, err)
253265
}
254266

255-
fileID := util.GetUUID()
267+
if fileID == "" {
268+
fileID = util.GetUUID()
269+
}
256270
fileSize := len(data)
257271
mimeType, _ := getMimeType(file)
258272

@@ -264,14 +278,24 @@ func uploadToBlobStore(ctx *orm.Context, file multipart.File, fileName string) (
264278
attachment.Icon = getFileExtension(fileName)
265279
attachment.URL = fmt.Sprintf("/attachment/%v", fileID)
266280
//attachment.Owner //TODO
281+
if ownerID != "" {
282+
attachment.SetOwnerID(ownerID)
283+
}
267284

268285
//save attachment metadata
269-
err = orm.Create(ctx, &attachment)
286+
if replaceIfExists {
287+
err = orm.Upsert(ctx, &attachment)
288+
} else {
289+
err = orm.Create(ctx, &attachment)
290+
}
270291
if err != nil {
271292
panic(err)
272293
}
273294

274295
//save attachment payload
296+
//
297+
// kv.AddValue will replace the previous value if it already exists so we
298+
// don't need to check [replaceIfExists] here.
275299
err = kv.AddValue(AttachmentKVBucket, []byte(fileID), data)
276300
if err != nil {
277301
panic(err)

plugins/connectors/local_fs/plugin.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -154,9 +154,10 @@ func (p *Plugin) saveDocument(ctx *pipeline.Context, currentPath, basePath strin
154154
Source: core.DataSourceReference{ID: datasource.ID, Type: "connector", Name: datasource.Name},
155155
Type: connectors.TypeFile,
156156
Category: filepath.Dir(currentPath),
157-
Content: "", // skip content
158-
URL: currentPath,
159-
Size: int(fileInfo.Size()),
157+
// skip content here, which will be popluated by the `read_file_content` processor
158+
Content: "",
159+
URL: currentPath,
160+
Size: int(fileInfo.Size()),
160161
}
161162
doc.System = datasource.System
162163
if doc.System == nil {

0 commit comments

Comments
 (0)