summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorsiddharth ravikumar <s@ricketyspace.net>2023-02-06 18:39:06 -0500
committersiddharth ravikumar <s@ricketyspace.net>2023-02-06 18:39:06 -0500
commit3322369afcb804f81ed88058e8db8b71df63b4b8 (patch)
treef016a2983b6be6bc4c8c8c6d09cbf29ca523fcaa
parent21642cee522a02c36556b60ada4ef9298e9ffbb9 (diff)
feed: update `Feed.Process`
Implement a simple semaphore using channels for processEntry go routines. Do not run more than 10 concurrent processEntries routines for a given feed.
-rw-r--r--feed/feed.go13
1 files changed, 11 insertions, 2 deletions
diff --git a/feed/feed.go b/feed/feed.go
index c2e6112..faacabe 100644
--- a/feed/feed.go
+++ b/feed/feed.go
@@ -153,6 +153,10 @@ func (feed *Feed) Process(pState *state.ProcessState) {
traversed := 0
// Channel for receiving entry results.
erChan := make(chan state.EntryResult)
+ // Simple semaphore to limit the number of concurrent
+ // processEntry calls.
+ // https://go.dev/doc/effective_go#channels
+ eSem := make(chan int, 10)
for _, entry := range feed.Entries {
e := entry
@@ -167,7 +171,7 @@ func (feed *Feed) Process(pState *state.ProcessState) {
// Process entry only if it was not downloaded before.
if !pState.DB.Exists(feed.Id, e.Id) {
- go feed.processEntry(e, erChan)
+ go feed.processEntry(e, erChan, eSem)
processing += 1
} else {
fmt.Printf("[%s][%s]: Already downloaded '%s' before\n",
@@ -211,7 +215,10 @@ func (feed *Feed) Process(pState *state.ProcessState) {
pState.FeedResultChan <- fr
}
-func (feed *Feed) processEntry(entry schema.Entry, erc chan state.EntryResult) {
+func (feed *Feed) processEntry(entry schema.Entry, erc chan state.EntryResult,
+ sema chan int) {
+ sema <- 1 // Wait for semaphore.
+
// Init EntryResult.
er := state.EntryResult{
EntryId: entry.Id,
@@ -227,6 +234,8 @@ func (feed *Feed) processEntry(entry schema.Entry, erc chan state.EntryResult) {
er.Err = err
}
erc <- er
+
+ <-sema // Give up token.
}
func (feed *Feed) ydl(entry schema.Entry) error {