Stop the beat when no more videos to fetch
This commit is contained in:
parent
d0ded4fd02
commit
007dbdd3a9
1 changed files with 18 additions and 9 deletions
|
@ -35,7 +35,7 @@ func New(b *beat.Beat, cfg *common.Config) (beat.Beater, error) {
|
|||
return bt, nil
|
||||
}
|
||||
|
||||
func scrapeVideos(startId string, fieldsToSend chan common.MapStr) {
|
||||
func scrapeVideos(startId string, fieldsToSend chan common.MapStr, done chan bool) {
|
||||
const BaseUrl = "https://www.youtube.com"
|
||||
const BaseSuffix = "/watch?v="
|
||||
|
||||
|
@ -81,6 +81,7 @@ func scrapeVideos(startId string, fieldsToSend chan common.MapStr) {
|
|||
|
||||
videoCollector.Visit(BaseUrl + BaseSuffix + startId)
|
||||
videoCollector.Wait()
|
||||
done <- true
|
||||
}
|
||||
|
||||
// Run starts youtubebeat.
|
||||
|
@ -94,7 +95,8 @@ func (bt *Youtubebeat) Run(b *beat.Beat) error {
|
|||
}
|
||||
|
||||
fieldsToSend := make(chan common.MapStr)
|
||||
go scrapeVideos(bt.config.StartId, fieldsToSend)
|
||||
done := make(chan bool)
|
||||
go scrapeVideos(bt.config.StartId, fieldsToSend, done)
|
||||
|
||||
ticker := time.NewTicker(bt.config.Period)
|
||||
for {
|
||||
|
@ -103,14 +105,21 @@ func (bt *Youtubebeat) Run(b *beat.Beat) error {
|
|||
return nil
|
||||
case <-ticker.C:
|
||||
}
|
||||
fields := <-fieldsToSend
|
||||
fields["type"] = b.Info.Name
|
||||
event := beat.Event{
|
||||
Timestamp: time.Now(),
|
||||
Fields: fields,
|
||||
// Handle a SIGINT even when no more videos to fetch
|
||||
select {
|
||||
case <-done:
|
||||
return nil
|
||||
case <-bt.done:
|
||||
return nil
|
||||
case fields := <-fieldsToSend:
|
||||
fields["type"] = b.Info.Name
|
||||
event := beat.Event{
|
||||
Timestamp: time.Now(),
|
||||
Fields: fields,
|
||||
}
|
||||
bt.client.Publish(event)
|
||||
logp.Info("Event sent")
|
||||
}
|
||||
bt.client.Publish(event)
|
||||
logp.Info("Event sent")
|
||||
}
|
||||
}
|
||||
|
||||
|
|
Loading…
Add table
Reference in a new issue