diff options
author | siddharth ravikumar <s@ricketyspace.net> | 2023-02-06 18:39:06 -0500 |
---|---|---|
committer | siddharth ravikumar <s@ricketyspace.net> | 2023-02-06 18:39:06 -0500 |
commit | 3322369afcb804f81ed88058e8db8b71df63b4b8 (patch) | |
tree | f016a2983b6be6bc4c8c8c6d09cbf29ca523fcaa | |
parent | 21642cee522a02c36556b60ada4ef9298e9ffbb9 (diff) |
feed: update `Feed.Process`
Implement a simple semaphore using channels for processEntry go
routines. Do not run more than 10 concurrent processEntries routines
for a given feed.
-rw-r--r-- | feed/feed.go | 13 |
1 files changed, 11 insertions, 2 deletions
diff --git a/feed/feed.go b/feed/feed.go index c2e6112..faacabe 100644 --- a/feed/feed.go +++ b/feed/feed.go @@ -153,6 +153,10 @@ func (feed *Feed) Process(pState *state.ProcessState) { traversed := 0 // Channel for receiving entry results. erChan := make(chan state.EntryResult) + // Simple semaphore to limit the number of concurrent + // processEntry calls. + // https://go.dev/doc/effective_go#channels + eSem := make(chan int, 10) for _, entry := range feed.Entries { e := entry @@ -167,7 +171,7 @@ func (feed *Feed) Process(pState *state.ProcessState) { // Process entry only if it was not downloaded before. if !pState.DB.Exists(feed.Id, e.Id) { - go feed.processEntry(e, erChan) + go feed.processEntry(e, erChan, eSem) processing += 1 } else { fmt.Printf("[%s][%s]: Already downloaded '%s' before\n", @@ -211,7 +215,10 @@ func (feed *Feed) Process(pState *state.ProcessState) { pState.FeedResultChan <- fr } -func (feed *Feed) processEntry(entry schema.Entry, erc chan state.EntryResult) { +func (feed *Feed) processEntry(entry schema.Entry, erc chan state.EntryResult, + sema chan int) { + sema <- 1 // Wait for semaphore. + // Init EntryResult. er := state.EntryResult{ EntryId: entry.Id, @@ -227,6 +234,8 @@ func (feed *Feed) processEntry(entry schema.Entry, erc chan state.EntryResult) { er.Err = err } erc <- er + + <-sema // Give up token. } func (feed *Feed) ydl(entry schema.Entry) error { |