99 lines
3 KiB
Python
Executable file
99 lines
3 KiB
Python
Executable file
#!/usr/bin/env python3
|
|
|
|
import json
|
|
import subprocess
|
|
import os
|
|
from pathlib import Path
|
|
import backup_manager
|
|
|
|
|
|
def read_config():
|
|
source_path = Path(__file__).resolve()
|
|
nextcloud_secret = source_path.parent / "secrets/forgejo.json"
|
|
with open(nextcloud_secret, "r") as f:
|
|
config = json.load(f)
|
|
return config
|
|
|
|
|
|
class ForgejoManager(backup_manager.BackupManager):
|
|
def __init__(self, config):
|
|
super().__init__(config=config)
|
|
|
|
def stop_all_services(self):
|
|
cmd = "docker compose stop"
|
|
try:
|
|
result = subprocess.run(
|
|
cmd, shell=True, text=True, check=True, capture_output=True
|
|
)
|
|
except subprocess.CalledProcessError as e:
|
|
self._gotify.send_subprocess_error("Failed to stop services", error=e)
|
|
return False
|
|
return True
|
|
|
|
def start_all_services(self):
|
|
cmd = "docker compose up -d"
|
|
try:
|
|
result = subprocess.run(
|
|
cmd, shell=True, text=True, check=True, capture_output=True
|
|
)
|
|
except subprocess.CalledProcessError as e:
|
|
self._gotify.send_subprocess_error("Failed to start services", error=e)
|
|
return False
|
|
return True
|
|
|
|
def start_database(self):
|
|
cmd = "docker compose up -d db"
|
|
try:
|
|
result = subprocess.run(
|
|
cmd, shell=True, text=True, check=True, capture_output=True
|
|
)
|
|
except subprocess.CalledProcessError as e:
|
|
self._gotify.send_subprocess_error(
|
|
"Failed to start database service", error=e
|
|
)
|
|
return False
|
|
return True
|
|
|
|
def dump_database(self):
|
|
password = self._common["MYSQL_PASSWORD"]
|
|
user = self._common["MYSQL_USER"]
|
|
db = self._common["MYSQL_DB"]
|
|
|
|
cmd = f"docker compose exec -i --user 1000:1000 db mariadb-dump --single-transaction --default-character-set=utf8mb4 -h localhost -u {user} --password={password} {db} > mysql/forgejo.sql"
|
|
try:
|
|
result = subprocess.run(
|
|
cmd,
|
|
shell=True,
|
|
check=True,
|
|
text=True,
|
|
capture_output=True,
|
|
)
|
|
except subprocess.CalledProcessError as e:
|
|
self._gotify.send_subprocess_error("Dumping database failed", e)
|
|
return False
|
|
self._gotify.send_success("Database dumped", result)
|
|
return True
|
|
|
|
|
|
def main():
|
|
config = read_config()
|
|
os.chdir(config["common"]["BACKUP_DOCKER_DIR"])
|
|
backup_manager = ForgejoManager(config)
|
|
if not backup_manager.stop_all_services():
|
|
exit(1)
|
|
if not backup_manager.start_database():
|
|
backup_manager.start_all_services()
|
|
exit(1)
|
|
if not backup_manager.dump_database():
|
|
backup_manager.start_all_services()
|
|
exit(1)
|
|
if not backup_manager.borg_backup():
|
|
backup_manager.start_all_services()
|
|
exit(1)
|
|
if not backup_manager.start_all_services():
|
|
exit(1)
|
|
exit(0)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|