youtubebeat/beater/youtubebeat.go

135 lines
3.0 KiB
Go

package beater
import (
"fmt"
"github.com/gocolly/colly"
"strconv"
"strings"
"time"
"github.com/elastic/beats/libbeat/beat"
"github.com/elastic/beats/libbeat/common"
"github.com/elastic/beats/libbeat/logp"
"github.com/Crocmagnon/youtubebeat/config"
)
// Youtubebeat configuration.
type Youtubebeat struct {
done chan struct{}
config config.Config
client beat.Client
}
// New creates an instance of youtubebeat.
func New(b *beat.Beat, cfg *common.Config) (beat.Beater, error) {
c := config.DefaultConfig
if err := cfg.Unpack(&c); err != nil {
return nil, fmt.Errorf("Error reading config file: %v", err)
}
bt := &Youtubebeat{
done: make(chan struct{}),
config: c,
}
return bt, nil
}
const BaseUrl = "https://www.youtube.com"
const BaseSuffix = "/watch?v="
func scrapeVideos(startId string, fieldsToSend chan common.MapStr, done chan bool) {
videoCollector := colly.NewCollector(
colly.AllowedDomains("youtube.com", "www.youtube.com"),
colly.Async(true),
colly.MaxDepth(10),
)
videoCollector.OnHTML("body", func(e *colly.HTMLElement) {
url := e.Request.URL.String()
isPaid := e.ChildAttr("meta[itemprop=\"paid\"]", "content")
if isPaid == "True" {
logp.Err("Not parsing video because of isPaid" + url)
return
}
title := e.ChildAttr("meta[itemprop=\"name\"]", "content")
if title == "YouTube" {
logp.Err("Not parsing video because of title " + url)
return
}
views, err := strconv.ParseInt(e.ChildAttr("meta[itemprop=\"interactionCount\"]", "content"), 10, 64)
if err != nil {
logp.Err("Can't parse view count for URL " + url)
return
}
date := e.ChildAttr("meta[itemprop=\"datePublished\"]", "content")
fields := common.MapStr{
"url": url,
"title": title,
"views": views,
"date": date,
}
fieldsToSend <- fields
})
videoCollector.OnHTML("a[href]", func(e *colly.HTMLElement) {
href := e.Attr("href")
if strings.HasPrefix(href, BaseSuffix) {
e.Request.Visit(BaseUrl + href)
}
})
videoCollector.Visit(BaseUrl + BaseSuffix + startId)
videoCollector.Wait()
done <- true
}
// Run starts youtubebeat.
func (bt *Youtubebeat) Run(b *beat.Beat) error {
logp.Info("youtubebeat is running! Hit CTRL-C to stop it.")
var err error
bt.client, err = b.Publisher.Connect()
if err != nil {
return err
}
fieldsToSend := make(chan common.MapStr)
done := make(chan bool)
go scrapeVideos(bt.config.StartId, fieldsToSend, done)
ticker := time.NewTicker(bt.config.Period)
for {
select {
case <-bt.done:
return nil
case <-ticker.C:
}
// Handle a SIGINT even when no more videos to fetch
select {
case <-done:
return nil
case <-bt.done:
return nil
case fields := <-fieldsToSend:
fields["type"] = b.Info.Name
event := beat.Event{
Timestamp: time.Now(),
Fields: fields,
}
id := strings.Replace(fields["url"].(string), BaseUrl+BaseSuffix, "", -1)
event.SetID(id)
bt.client.Publish(event)
logp.Info("Event sent")
}
}
}
// Stop stops youtubebeat.
func (bt *Youtubebeat) Stop() {
bt.client.Close()
close(bt.done)
}