aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorBohdan Horbeshko <bodqhrohro@gmail.com>2022-04-01 04:42:12 +0300
committerBohdan Horbeshko <bodqhrohro@gmail.com>2022-04-01 04:42:12 +0300
commit17afd3f8c7a016d5103be949990efb695de865b5 (patch)
tree5fa9a3a0cbf418b7e4913e9aef2df3d5981030f3
parent5c238db1da48e6c1d51a3d00f6a661f99de03784 (diff)
Limit the file storage by an optional quota
-rw-r--r--Makefile2
-rw-r--r--config.yml.example1
-rw-r--r--config/config.go1
-rw-r--r--config_schema.json3
-rw-r--r--telegram/utils.go26
-rw-r--r--xmpp/component.go78
-rw-r--r--xmpp/component_test.go47
-rw-r--r--xmpp/gateway/storage.go89
8 files changed, 246 insertions, 1 deletions
diff --git a/Makefile b/Makefile
index 837be39..1f75f76 100644
--- a/Makefile
+++ b/Makefile
@@ -4,7 +4,7 @@ all:
go build -o telegabber
test:
- go test -v ./config ./ ./telegram ./xmpp/gateway ./persistence ./telegram/formatter
+ go test -v ./config ./ ./telegram ./xmpp ./xmpp/gateway ./persistence ./telegram/formatter
lint:
$(GOPATH)/bin/golint ./...
diff --git a/config.yml.example b/config.yml.example
index dae2350..b8de1dd 100644
--- a/config.yml.example
+++ b/config.yml.example
@@ -5,6 +5,7 @@
:link: 'http://tlgrm.localhost/content' # webserver public address
:upload: 'https:///xmppfiles.localhost' # xmpp http upload address
:user: 'www-data' # owner of content files
+ :quota: '256MB' # maximum storage size
:tdlib_verbosity: 1
:tdlib:
:datadir: './sessions/'
diff --git a/config/config.go b/config/config.go
index e88142f..7c685fb 100644
--- a/config/config.go
+++ b/config/config.go
@@ -39,6 +39,7 @@ type TelegramContentConfig struct {
Link string `yaml:":link"`
Upload string `yaml:":upload"`
User string `yaml:":user"`
+ Quota string `yaml:":quota"`
}
// TelegramTdlibConfig is for :tdlib: subtree
diff --git a/config_schema.json b/config_schema.json
index e77b8c3..ab25307 100644
--- a/config_schema.json
+++ b/config_schema.json
@@ -24,6 +24,9 @@
},
":user": {
"type": "string"
+ },
+ ":quota": {
+ "type": "string"
}
}
},
diff --git a/telegram/utils.go b/telegram/utils.go
index f60f35f..b41423a 100644
--- a/telegram/utils.go
+++ b/telegram/utils.go
@@ -361,6 +361,9 @@ func (c *Client) formatFile(file *client.File, compact bool) string {
return ""
}
+ gateway.StorageLock.Lock()
+ defer gateway.StorageLock.Unlock()
+
var link string
var src string
@@ -372,6 +375,9 @@ func (c *Client) formatFile(file *client.File, compact bool) string {
return ""
}
+ size64:= uint64(file.Size)
+ c.prepareDiskSpace(size64)
+
basename := file.Remote.UniqueId + filepath.Ext(src)
dest := c.content.Path + "/" + basename // destination path
link = c.content.Link + "/" + basename // download link
@@ -387,6 +393,8 @@ func (c *Client) formatFile(file *client.File, compact bool) string {
return "<ERROR>"
}
}
+ gateway.CachedStorageSize += size64
+
// chown
if c.content.User != "" {
user, err := osUser.Lookup(c.content.User)
@@ -729,7 +737,12 @@ func (c *Client) messageToPrefix(message *client.Message, previewString string,
}
func (c *Client) ensureDownloadFile(file *client.File) *client.File {
+ gateway.StorageLock.Lock()
+ defer gateway.StorageLock.Unlock()
+
if file != nil {
+ c.prepareDiskSpace(uint64(file.Size))
+
newFile, err := c.DownloadFile(file.Id, 1, true)
if err == nil {
return newFile
@@ -952,3 +965,16 @@ func (c *Client) subscribeToID(id int64, chat *client.Chat) {
args...,
)
}
+
+func (c *Client) prepareDiskSpace(size uint64) {
+ if gateway.StorageQuota > 0 && c.content.Path != "" {
+ var loweredQuota uint64
+ if gateway.StorageQuota >= size {
+ loweredQuota = gateway.StorageQuota - size
+ }
+ if gateway.CachedStorageSize >= loweredQuota {
+ log.Warn("Storage is rapidly clogged")
+ gateway.CleanOldFiles(c.content.Path, loweredQuota)
+ }
+ }
+}
diff --git a/xmpp/component.go b/xmpp/component.go
index 961761b..1749100 100644
--- a/xmpp/component.go
+++ b/xmpp/component.go
@@ -2,6 +2,8 @@ package xmpp
import (
"github.com/pkg/errors"
+ "regexp"
+ "strconv"
"sync"
"time"
@@ -20,6 +22,19 @@ var sessions map[string]*telegram.Client
var db *persistence.SessionsYamlDB
var sessionLock sync.Mutex
+const (
+ B uint64 = 1
+ KB = B << 10
+ MB = KB << 10
+ GB = MB << 10
+ TB = GB << 10
+ PB = TB << 10
+ EB = PB << 10
+
+ maxUint64 uint64 = (1 << 64) - 1
+)
+var sizeRegex = regexp.MustCompile("\\A([0-9]+) ?([KMGTPE]?B?)\\z")
+
// NewComponent starts a new component and wraps it in
// a stream manager that you should start yourself
func NewComponent(conf config.XMPPConfig, tc config.TelegramConfig) (*xmpp.StreamManager, *xmpp.Component, error) {
@@ -32,6 +47,13 @@ func NewComponent(conf config.XMPPConfig, tc config.TelegramConfig) (*xmpp.Strea
tgConf = tc
+ if tc.Content.Quota != "" {
+ gateway.StorageQuota, err = parseSize(tc.Content.Quota)
+ if err != nil {
+ log.Warnf("Error parsing the storage quota: %v; the cleaner is disabled", err)
+ }
+ }
+
options := xmpp.ComponentOptions{
TransportConfiguration: xmpp.TransportConfiguration{
Address: conf.Host + ":" + conf.Port,
@@ -80,10 +102,22 @@ func heartbeat(component *xmpp.Component) {
}
sessionLock.Unlock()
+ quotaLowThreshold := gateway.StorageQuota / 10 * 9
+
log.Info("Starting heartbeat queue")
// status updater thread
for {
+ gateway.StorageLock.Lock()
+ if quotaLowThreshold > 0 && tgConf.Content.Path != "" {
+ gateway.MeasureStorageSize(tgConf.Content.Path)
+
+ if gateway.CachedStorageSize > quotaLowThreshold {
+ gateway.CleanOldFiles(tgConf.Content.Path, quotaLowThreshold)
+ }
+ }
+ gateway.StorageLock.Unlock()
+
time.Sleep(60e9)
now := time.Now().Unix()
@@ -201,3 +235,47 @@ func Close(component *xmpp.Component) {
// close stream
component.Disconnect()
}
+
+// based on https://github.com/c2h5oh/datasize/blob/master/datasize.go
+func parseSize(sSize string) (uint64, error) {
+ sizeParts := sizeRegex.FindStringSubmatch(sSize)
+
+ if len(sizeParts) > 2 {
+ numPart, err := strconv.ParseInt(sizeParts[1], 10, 64)
+ if err != nil {
+ return 0, err
+ }
+
+ var divisor uint64
+ val := uint64(numPart)
+
+ if len(sizeParts[2]) > 0 {
+ switch sizeParts[2][0] {
+ case 'B':
+ divisor = 1
+ case 'K':
+ divisor = KB
+ case 'M':
+ divisor = MB
+ case 'G':
+ divisor = GB
+ case 'T':
+ divisor = TB
+ case 'P':
+ divisor = PB
+ case 'E':
+ divisor = EB
+ }
+ }
+
+ if divisor == 0 {
+ return 0, &strconv.NumError{"Wrong suffix", sSize, strconv.ErrSyntax}
+ }
+ if val > maxUint64/divisor {
+ return 0, &strconv.NumError{"Overflow", sSize, strconv.ErrRange}
+ }
+ return val * divisor, nil
+ }
+
+ return 0, &strconv.NumError{"Not enough parts", sSize, strconv.ErrSyntax}
+}
diff --git a/xmpp/component_test.go b/xmpp/component_test.go
new file mode 100644
index 0000000..7e7e5e7
--- /dev/null
+++ b/xmpp/component_test.go
@@ -0,0 +1,47 @@
+package xmpp
+
+import (
+ "testing"
+)
+
+func TestParseSizeGarbage(t *testing.T) {
+ _, err := parseSize("abc")
+ if err == nil {
+ t.Error("abc should not be accepted")
+ }
+}
+
+func TestParseSizeAsphalt(t *testing.T) {
+ size, err := parseSize("2B")
+ if size != 2 {
+ t.Errorf("Error parsing two bytes: %v %v", size, err)
+ }
+}
+
+func TestParseSize9K(t *testing.T) {
+ size, err := parseSize("9 KB")
+ if size != 9216 {
+ t.Errorf("Error parsing 9K: %v %v", size, err)
+ }
+}
+
+func TestParseSizeBits(t *testing.T) {
+ size, err := parseSize("9 Kb")
+ if err == nil {
+ t.Errorf("Error parsing kilobits: %v %v", size, err)
+ }
+}
+
+func TestParseSizeEB(t *testing.T) {
+ size, err := parseSize("3EB")
+ if size != 3458764513820540928 {
+ t.Errorf("Error parsing exabytes: %v %v", size, err)
+ }
+}
+
+func TestParseSizeOverflow(t *testing.T) {
+ size, err := parseSize("314EB")
+ if err == nil {
+ t.Errorf("Overflow is not overflowing: %v %v", size, err)
+ }
+}
diff --git a/xmpp/gateway/storage.go b/xmpp/gateway/storage.go
new file mode 100644
index 0000000..a3fad79
--- /dev/null
+++ b/xmpp/gateway/storage.go
@@ -0,0 +1,89 @@
+package gateway
+
+import (
+ "io/ioutil"
+ "os"
+ "sort"
+ "sync"
+
+ log "github.com/sirupsen/logrus"
+)
+
+// StorageQuota is a value from config parsed to bytes number
+var StorageQuota uint64
+// CachedStorageSize estimates the storage size between full rescans
+var CachedStorageSize uint64
+var StorageLock = sync.Mutex{}
+
+// MeasureStorageSize replaces the estimated storage size with relevant data from the filesystem
+func MeasureStorageSize(path string) {
+ dents, err := ioutil.ReadDir(path)
+ if err != nil {
+ return
+ }
+
+ var total uint64
+ for _, fi := range dents {
+ if !fi.IsDir() {
+ total += uint64(fi.Size())
+ }
+ }
+
+ if total != CachedStorageSize {
+ if CachedStorageSize > 0 {
+ log.Warnf("Correcting cached storage size: was %v, actually %v", CachedStorageSize, total)
+ }
+ CachedStorageSize = total
+ }
+}
+
+// CleanOldFiles purges the oldest files in a directory that exceed the limit
+func CleanOldFiles(path string, limit uint64) {
+ dents, err := ioutil.ReadDir(path)
+ if err != nil {
+ return
+ }
+
+ var total uint64
+ for _, fi := range dents {
+ if !fi.IsDir() {
+ total += uint64(fi.Size())
+ }
+ }
+
+ // sort by time
+ sort.Slice(dents, func(i int, j int) bool {
+ return dents[i].ModTime().Before(dents[j].ModTime())
+ })
+
+ // purge
+ if total > limit {
+ toPurge := total - limit
+
+ var purgedAmount uint64
+ var purgedCount uint64
+
+ for _, fi := range dents {
+ if !fi.IsDir() {
+ err = os.Remove(path + string(os.PathSeparator) + fi.Name())
+ if err != nil {
+ log.Errorf("Couldn't remove %v: %v", fi.Name(), err)
+ continue
+ }
+
+ purgedAmount += uint64(fi.Size())
+ purgedCount += 1
+ if purgedAmount >= toPurge {
+ break
+ }
+ }
+ }
+
+ log.Infof("Cleaned %v bytes of %v old files", purgedAmount, purgedCount)
+ if CachedStorageSize > purgedAmount {
+ CachedStorageSize -= purgedAmount
+ } else {
+ CachedStorageSize = 0
+ }
+ }
+}