diff --git a/beater/youtubebeat.go b/beater/youtubebeat.go index bdade93..3a5e0b6 100644 --- a/beater/youtubebeat.go +++ b/beater/youtubebeat.go @@ -35,7 +35,7 @@ func New(b *beat.Beat, cfg *common.Config) (beat.Beater, error) { return bt, nil } -func scrapeVideos(startId string, fieldsToSend chan common.MapStr) { +func scrapeVideos(startId string, fieldsToSend chan common.MapStr, done chan bool) { const BaseUrl = "https://www.youtube.com" const BaseSuffix = "/watch?v=" @@ -81,6 +81,7 @@ func scrapeVideos(startId string, fieldsToSend chan common.MapStr) { videoCollector.Visit(BaseUrl + BaseSuffix + startId) videoCollector.Wait() + done <- true } // Run starts youtubebeat. @@ -94,7 +95,8 @@ func (bt *Youtubebeat) Run(b *beat.Beat) error { } fieldsToSend := make(chan common.MapStr) - go scrapeVideos(bt.config.StartId, fieldsToSend) + done := make(chan bool) + go scrapeVideos(bt.config.StartId, fieldsToSend, done) ticker := time.NewTicker(bt.config.Period) for { @@ -103,14 +105,21 @@ func (bt *Youtubebeat) Run(b *beat.Beat) error { return nil case <-ticker.C: } - fields := <-fieldsToSend - fields["type"] = b.Info.Name - event := beat.Event{ - Timestamp: time.Now(), - Fields: fields, + // Handle a SIGINT even when no more videos to fetch + select { + case <-done: + return nil + case <-bt.done: + return nil + case fields := <-fieldsToSend: + fields["type"] = b.Info.Name + event := beat.Event{ + Timestamp: time.Now(), + Fields: fields, + } + bt.client.Publish(event) + logp.Info("Event sent") } - bt.client.Publish(event) - logp.Info("Event sent") } }