diff --git a/_meta/beat.yml b/_meta/beat.yml index af65e48..4b36eda 100644 --- a/_meta/beat.yml +++ b/_meta/beat.yml @@ -6,6 +6,8 @@ youtubebeat: # Defines how often an event is sent to the output period: 1s start_id: "SmBCZgcGlKk" + parallelism: 5 + max_depth: 10 output.elasticsearch.index: "youtubebeat" diff --git a/beater/youtubebeat.go b/beater/youtubebeat.go index 3dae92b..06d7461 100644 --- a/beater/youtubebeat.go +++ b/beater/youtubebeat.go @@ -38,30 +38,30 @@ func New(b *beat.Beat, cfg *common.Config) (beat.Beater, error) { const BaseUrl = "https://www.youtube.com" const BaseSuffix = "/watch?v=" -func scrapeVideos(startId string, fieldsToSend chan common.MapStr, done chan bool) { +func scrapeVideos(b *beat.Beat, bt *Youtubebeat, done chan bool) { videoCollector := colly.NewCollector( colly.AllowedDomains("youtube.com", "www.youtube.com"), colly.Async(true), - colly.MaxDepth(10), + colly.MaxDepth(bt.config.MaxDepth), ) - videoCollector.Limit(&colly.LimitRule{DomainGlob: "*", Parallelism: 5}) + videoCollector.Limit(&colly.LimitRule{DomainGlob: "*", Parallelism: bt.config.Parallelism}) videoCollector.AllowURLRevisit = true videoCollector.OnHTML("body", func(e *colly.HTMLElement) { url := e.Request.URL.String() isPaid := e.ChildAttr("meta[itemprop=\"paid\"]", "content") if isPaid == "True" { - logp.Err("Not parsing video because of isPaid" + url) + logp.Warn("Not parsing video because of isPaid" + url) return } title := e.ChildAttr("meta[itemprop=\"name\"]", "content") if title == "YouTube" { - logp.Err("Not parsing video because of title " + url) + logp.Warn("Not parsing video because of title " + url) return } views, err := strconv.ParseInt(e.ChildAttr("meta[itemprop=\"interactionCount\"]", "content"), 10, 64) if err != nil { - logp.Err("Can't parse view count for URL " + url) + logp.Warn("Can't parse view count for URL " + url) return } date := e.ChildAttr("meta[itemprop=\"datePublished\"]", "content") @@ -70,8 +70,18 @@ func scrapeVideos(startId string, fieldsToSend chan common.MapStr, done chan boo "title": title, "views": views, "date": date, + "type": b.Info.Name, } - fieldsToSend <- fields + event := beat.Event{ + Timestamp: time.Now(), + Fields: fields, + } + + id := strings.Replace(fields["url"].(string), BaseUrl+BaseSuffix, "", -1) + event.SetID(id) + + bt.client.Publish(event) + logp.Info("Event sent") }) videoCollector.OnHTML("a[href]", func(e *colly.HTMLElement) { @@ -81,7 +91,7 @@ func scrapeVideos(startId string, fieldsToSend chan common.MapStr, done chan boo } }) - videoCollector.Visit(BaseUrl + BaseSuffix + startId) + videoCollector.Visit(BaseUrl + BaseSuffix + bt.config.StartId) videoCollector.Wait() done <- true } @@ -96,38 +106,14 @@ func (bt *Youtubebeat) Run(b *beat.Beat) error { return err } - fieldsToSend := make(chan common.MapStr) done := make(chan bool) - go scrapeVideos(bt.config.StartId, fieldsToSend, done) + go scrapeVideos(b, bt, done) - ticker := time.NewTicker(bt.config.Period) - for { - select { - case <-done: - return nil - case <-bt.done: - return nil - case <-ticker.C: - } - // Handle a SIGINT even when no more videos to fetch - select { - case <-done: - return nil - case <-bt.done: - return nil - case fields := <-fieldsToSend: - fields["type"] = b.Info.Name - event := beat.Event{ - Timestamp: time.Now(), - Fields: fields, - } - - id := strings.Replace(fields["url"].(string), BaseUrl+BaseSuffix, "", -1) - event.SetID(id) - - bt.client.Publish(event) - logp.Info("Event sent") - } + select { + case <-done: + return nil + case <-bt.done: + return nil } } diff --git a/config/config.go b/config/config.go index 7d0f392..207b35d 100644 --- a/config/config.go +++ b/config/config.go @@ -6,11 +6,15 @@ package config import "time" type Config struct { - Period time.Duration `config:"period"` - StartId string `config:"start_id"` + Period time.Duration `config:"period"` + StartId string `config:"start_id"` + Parallelism int `config:"parallelism"` + MaxDepth int `config:"max_depth"` } var DefaultConfig = Config{ - Period: 1 * time.Second, - StartId: "SmBCZgcGlKk", + Period: 1 * time.Second, + StartId: "SmBCZgcGlKk", + Parallelism: 5, + MaxDepth: 10, } diff --git a/youtubebeat.reference.yml b/youtubebeat.reference.yml index 27e00ca..495e73d 100644 --- a/youtubebeat.reference.yml +++ b/youtubebeat.reference.yml @@ -6,6 +6,8 @@ youtubebeat: # Defines how often an event is sent to the output period: 1s start_id: "SmBCZgcGlKk" + parallelism: 5 + max_depth: 10 output.elasticsearch.index: "youtubebeat" diff --git a/youtubebeat.yml b/youtubebeat.yml index 6e72fc4..f86d3aa 100644 --- a/youtubebeat.yml +++ b/youtubebeat.yml @@ -6,6 +6,8 @@ youtubebeat: # Defines how often an event is sent to the output period: 1s start_id: "SmBCZgcGlKk" + parallelism: 5 + max_depth: 10 output.elasticsearch.index: "youtubebeat"