diff options
author | Bohdan Horbeshko <bodqhrohro@gmail.com> | 2022-04-01 04:42:12 +0300 |
---|---|---|
committer | Bohdan Horbeshko <bodqhrohro@gmail.com> | 2022-04-01 04:42:12 +0300 |
commit | 17afd3f8c7a016d5103be949990efb695de865b5 (patch) | |
tree | 5fa9a3a0cbf418b7e4913e9aef2df3d5981030f3 /xmpp | |
parent | 5c238db1da48e6c1d51a3d00f6a661f99de03784 (diff) |
Limit the file storage by an optional quota
Diffstat (limited to 'xmpp')
-rw-r--r-- | xmpp/component.go | 78 | ||||
-rw-r--r-- | xmpp/component_test.go | 47 | ||||
-rw-r--r-- | xmpp/gateway/storage.go | 89 |
3 files changed, 214 insertions, 0 deletions
diff --git a/xmpp/component.go b/xmpp/component.go index 961761b..1749100 100644 --- a/xmpp/component.go +++ b/xmpp/component.go @@ -2,6 +2,8 @@ package xmpp import ( "github.com/pkg/errors" + "regexp" + "strconv" "sync" "time" @@ -20,6 +22,19 @@ var sessions map[string]*telegram.Client var db *persistence.SessionsYamlDB var sessionLock sync.Mutex +const ( + B uint64 = 1 + KB = B << 10 + MB = KB << 10 + GB = MB << 10 + TB = GB << 10 + PB = TB << 10 + EB = PB << 10 + + maxUint64 uint64 = (1 << 64) - 1 +) +var sizeRegex = regexp.MustCompile("\\A([0-9]+) ?([KMGTPE]?B?)\\z") + // NewComponent starts a new component and wraps it in // a stream manager that you should start yourself func NewComponent(conf config.XMPPConfig, tc config.TelegramConfig) (*xmpp.StreamManager, *xmpp.Component, error) { @@ -32,6 +47,13 @@ func NewComponent(conf config.XMPPConfig, tc config.TelegramConfig) (*xmpp.Strea tgConf = tc + if tc.Content.Quota != "" { + gateway.StorageQuota, err = parseSize(tc.Content.Quota) + if err != nil { + log.Warnf("Error parsing the storage quota: %v; the cleaner is disabled", err) + } + } + options := xmpp.ComponentOptions{ TransportConfiguration: xmpp.TransportConfiguration{ Address: conf.Host + ":" + conf.Port, @@ -80,10 +102,22 @@ func heartbeat(component *xmpp.Component) { } sessionLock.Unlock() + quotaLowThreshold := gateway.StorageQuota / 10 * 9 + log.Info("Starting heartbeat queue") // status updater thread for { + gateway.StorageLock.Lock() + if quotaLowThreshold > 0 && tgConf.Content.Path != "" { + gateway.MeasureStorageSize(tgConf.Content.Path) + + if gateway.CachedStorageSize > quotaLowThreshold { + gateway.CleanOldFiles(tgConf.Content.Path, quotaLowThreshold) + } + } + gateway.StorageLock.Unlock() + time.Sleep(60e9) now := time.Now().Unix() @@ -201,3 +235,47 @@ func Close(component *xmpp.Component) { // close stream component.Disconnect() } + +// based on https://github.com/c2h5oh/datasize/blob/master/datasize.go +func parseSize(sSize string) (uint64, error) { + sizeParts := sizeRegex.FindStringSubmatch(sSize) + + if len(sizeParts) > 2 { + numPart, err := strconv.ParseInt(sizeParts[1], 10, 64) + if err != nil { + return 0, err + } + + var divisor uint64 + val := uint64(numPart) + + if len(sizeParts[2]) > 0 { + switch sizeParts[2][0] { + case 'B': + divisor = 1 + case 'K': + divisor = KB + case 'M': + divisor = MB + case 'G': + divisor = GB + case 'T': + divisor = TB + case 'P': + divisor = PB + case 'E': + divisor = EB + } + } + + if divisor == 0 { + return 0, &strconv.NumError{"Wrong suffix", sSize, strconv.ErrSyntax} + } + if val > maxUint64/divisor { + return 0, &strconv.NumError{"Overflow", sSize, strconv.ErrRange} + } + return val * divisor, nil + } + + return 0, &strconv.NumError{"Not enough parts", sSize, strconv.ErrSyntax} +} diff --git a/xmpp/component_test.go b/xmpp/component_test.go new file mode 100644 index 0000000..7e7e5e7 --- /dev/null +++ b/xmpp/component_test.go @@ -0,0 +1,47 @@ +package xmpp + +import ( + "testing" +) + +func TestParseSizeGarbage(t *testing.T) { + _, err := parseSize("abc") + if err == nil { + t.Error("abc should not be accepted") + } +} + +func TestParseSizeAsphalt(t *testing.T) { + size, err := parseSize("2B") + if size != 2 { + t.Errorf("Error parsing two bytes: %v %v", size, err) + } +} + +func TestParseSize9K(t *testing.T) { + size, err := parseSize("9 KB") + if size != 9216 { + t.Errorf("Error parsing 9K: %v %v", size, err) + } +} + +func TestParseSizeBits(t *testing.T) { + size, err := parseSize("9 Kb") + if err == nil { + t.Errorf("Error parsing kilobits: %v %v", size, err) + } +} + +func TestParseSizeEB(t *testing.T) { + size, err := parseSize("3EB") + if size != 3458764513820540928 { + t.Errorf("Error parsing exabytes: %v %v", size, err) + } +} + +func TestParseSizeOverflow(t *testing.T) { + size, err := parseSize("314EB") + if err == nil { + t.Errorf("Overflow is not overflowing: %v %v", size, err) + } +} diff --git a/xmpp/gateway/storage.go b/xmpp/gateway/storage.go new file mode 100644 index 0000000..a3fad79 --- /dev/null +++ b/xmpp/gateway/storage.go @@ -0,0 +1,89 @@ +package gateway + +import ( + "io/ioutil" + "os" + "sort" + "sync" + + log "github.com/sirupsen/logrus" +) + +// StorageQuota is a value from config parsed to bytes number +var StorageQuota uint64 +// CachedStorageSize estimates the storage size between full rescans +var CachedStorageSize uint64 +var StorageLock = sync.Mutex{} + +// MeasureStorageSize replaces the estimated storage size with relevant data from the filesystem +func MeasureStorageSize(path string) { + dents, err := ioutil.ReadDir(path) + if err != nil { + return + } + + var total uint64 + for _, fi := range dents { + if !fi.IsDir() { + total += uint64(fi.Size()) + } + } + + if total != CachedStorageSize { + if CachedStorageSize > 0 { + log.Warnf("Correcting cached storage size: was %v, actually %v", CachedStorageSize, total) + } + CachedStorageSize = total + } +} + +// CleanOldFiles purges the oldest files in a directory that exceed the limit +func CleanOldFiles(path string, limit uint64) { + dents, err := ioutil.ReadDir(path) + if err != nil { + return + } + + var total uint64 + for _, fi := range dents { + if !fi.IsDir() { + total += uint64(fi.Size()) + } + } + + // sort by time + sort.Slice(dents, func(i int, j int) bool { + return dents[i].ModTime().Before(dents[j].ModTime()) + }) + + // purge + if total > limit { + toPurge := total - limit + + var purgedAmount uint64 + var purgedCount uint64 + + for _, fi := range dents { + if !fi.IsDir() { + err = os.Remove(path + string(os.PathSeparator) + fi.Name()) + if err != nil { + log.Errorf("Couldn't remove %v: %v", fi.Name(), err) + continue + } + + purgedAmount += uint64(fi.Size()) + purgedCount += 1 + if purgedAmount >= toPurge { + break + } + } + } + + log.Infof("Cleaned %v bytes of %v old files", purgedAmount, purgedCount) + if CachedStorageSize > purgedAmount { + CachedStorageSize -= purgedAmount + } else { + CachedStorageSize = 0 + } + } +} |