Files
iknowyou/back/internal/search/manager.go
2026-04-06 15:12:34 +02:00

275 lines
5.7 KiB
Go

package search
import (
"context"
"fmt"
"reflect"
"sync"
"time"
"github.com/google/uuid"
"github.com/anotherhadi/iknowyou/config"
"github.com/anotherhadi/iknowyou/internal/tools"
)
type Manager struct {
mu sync.RWMutex
searches map[string]*Search
configPath string
factories []func() tools.ToolRunner
searchTTL time.Duration
cleanupInterval time.Duration
done chan struct{} // closed by Stop()
}
func NewManager(configPath string, factories []func() tools.ToolRunner, searchTTL, cleanupInterval time.Duration) *Manager {
m := &Manager{
searches: make(map[string]*Search),
configPath: configPath,
factories: factories,
searchTTL: searchTTL,
cleanupInterval: cleanupInterval,
done: make(chan struct{}),
}
go m.cleanupLoop()
return m
}
func (m *Manager) Stop() {
close(m.done)
}
func (m *Manager) Start(
parentCtx context.Context,
target string,
inputType tools.InputType,
profileName string,
) (*Search, error) {
// "default" is the canonical UI name for the no-filter profile.
if profileName == "default" {
profileName = ""
}
cfg, err := config.Load(m.configPath)
if err != nil {
return nil, fmt.Errorf("manager: loading config: %w", err)
}
activeTools, statuses, err := m.instantiate(cfg, inputType, profileName)
if err != nil {
return nil, err
}
ctx, cancel := context.WithCancel(parentCtx)
s := &Search{
ID: uuid.NewString(),
Target: target,
InputType: inputType,
Profile: profileName,
StartedAt: time.Now(),
PlannedTools: statuses,
cancelFn: cancel,
status: StatusRunning,
}
m.mu.Lock()
m.searches[s.ID] = s
m.mu.Unlock()
go m.runAll(ctx, s, activeTools)
return s, nil
}
func (m *Manager) Get(id string) (*Search, error) {
return m.get(id)
}
func (m *Manager) All() []*Search {
m.mu.RLock()
defer m.mu.RUnlock()
out := make([]*Search, 0, len(m.searches))
for _, s := range m.searches {
out = append(out, s)
}
return out
}
func (m *Manager) Delete(id string) error {
s, err := m.get(id)
if err != nil {
return err
}
s.Cancel()
m.mu.Lock()
delete(m.searches, id)
m.mu.Unlock()
return nil
}
func (m *Manager) cleanupLoop() {
ticker := time.NewTicker(m.cleanupInterval)
defer ticker.Stop()
for {
select {
case <-ticker.C:
m.purgeExpired()
case <-m.done:
return
}
}
}
func (m *Manager) purgeExpired() {
now := time.Now()
m.mu.Lock()
defer m.mu.Unlock()
for id, s := range m.searches {
ft := s.FinishedAt()
if ft.IsZero() {
continue // still running
}
if now.Sub(ft) > m.searchTTL {
delete(m.searches, id)
}
}
}
func (m *Manager) instantiate(cfg *config.Config, inputType tools.InputType, profileName string) ([]tools.ToolRunner, []ToolStatus, error) {
allNames := make([]string, len(m.factories))
allInstances := make([]tools.ToolRunner, len(m.factories))
for i, factory := range m.factories {
t := factory()
allNames[i] = t.Name()
allInstances[i] = t
}
activeNames, err := cfg.ActiveTools(profileName, allNames)
if err != nil {
return nil, nil, err
}
activeSet := make(map[string]struct{}, len(activeNames))
for _, n := range activeNames {
activeSet[n] = struct{}{}
}
var runners []tools.ToolRunner
var statuses []ToolStatus
for _, tool := range allInstances {
if _, ok := activeSet[tool.Name()]; !ok {
continue
}
if !acceptsInputType(tool, inputType) {
continue
}
if a, ok := tool.(tools.AvailabilityChecker); ok {
if available, reason := a.Available(); !available {
statuses = append(statuses, ToolStatus{
Name: tool.Name(),
Skipped: true,
Reason: reason,
})
continue
}
}
if c, ok := tool.(tools.Configurable); ok {
if err := cfg.DecodeEffective(tool.Name(), profileName, c.ConfigPtr()); err != nil {
return nil, nil, fmt.Errorf("manager: configuring tool %q: %w", tool.Name(), err)
}
if d, ok := tool.(tools.ConfigDescriber); ok {
if missing, fieldName := missingRequiredField(d.ConfigFields()); missing {
statuses = append(statuses, ToolStatus{
Name: tool.Name(),
Skipped: true,
Reason: fmt.Sprintf("missing required config field: %s", fieldName),
})
continue
}
}
}
statuses = append(statuses, ToolStatus{Name: tool.Name()})
runners = append(runners, tool)
}
return runners, statuses, nil
}
func (m *Manager) runAll(ctx context.Context, s *Search, runners []tools.ToolRunner) {
var wg sync.WaitGroup
for _, tool := range runners {
wg.Add(1)
go func(t tools.ToolRunner) {
defer wg.Done()
m.runOne(ctx, s, t)
}(tool)
}
wg.Wait()
s.markDone()
}
func (m *Manager) runOne(ctx context.Context, s *Search, tool tools.ToolRunner) {
out := make(chan tools.Event)
go func() {
_ = tool.Run(ctx, s.Target, s.InputType, out)
}()
var count int
var hasCount bool
for e := range out {
if e.Type == tools.EventTypeCount {
if n, ok := e.Payload.(int); ok {
count += n
hasCount = true
}
continue
}
s.append(e)
}
if hasCount {
s.setToolResultCount(tool.Name(), count)
}
}
func (m *Manager) get(id string) (*Search, error) {
m.mu.RLock()
defer m.mu.RUnlock()
s, ok := m.searches[id]
if !ok {
return nil, fmt.Errorf("search %q not found", id)
}
return s, nil
}
func acceptsInputType(tool tools.ToolRunner, inputType tools.InputType) bool {
for _, t := range tool.InputTypes() {
if t == inputType {
return true
}
}
return false
}
func missingRequiredField(fields []tools.ConfigField) (missing bool, fieldName string) {
for _, f := range fields {
if !f.Required {
continue
}
if f.Value == nil || reflect.DeepEqual(f.Value, reflect.Zero(reflect.TypeOf(f.Value)).Interface()) {
return true, f.Name
}
}
return false, ""
}