817 lines
28 KiB
Python
817 lines
28 KiB
Python
import os
|
|
import sqlite3
|
|
import uuid
|
|
import hashlib
|
|
import datetime
|
|
from flask import Flask, render_template, request, redirect, url_for, flash, session, send_from_directory
|
|
from werkzeug.utils import secure_filename
|
|
from functools import wraps
|
|
|
|
app = Flask(__name__)
|
|
app.secret_key = os.environ.get('SECRET_KEY', os.urandom(24))
|
|
|
|
# Configuration
|
|
UPLOAD_FOLDER = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'uploads')
|
|
ALLOWED_EXTENSIONS = {'pdf', 'docx', 'xlsx', 'jpg', 'png', 'mp4', 'txt'}
|
|
DB_PATH = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'dms.db')
|
|
|
|
# Create upload folder if it doesn't exist
|
|
if not os.path.exists(UPLOAD_FOLDER):
|
|
os.makedirs(UPLOAD_FOLDER)
|
|
|
|
app.config['UPLOAD_FOLDER'] = UPLOAD_FOLDER
|
|
app.config['MAX_CONTENT_LENGTH'] = 16 * 1024 * 1024 # 16MB max upload size
|
|
|
|
# Production configuration
|
|
app.config['PREFERRED_URL_SCHEME'] = 'https'
|
|
app.config['SESSION_COOKIE_SECURE'] = False
|
|
app.config['SESSION_COOKIE_HTTPONLY'] = True
|
|
app.config['PERMANENT_SESSION_LIFETIME'] = datetime.timedelta(days=1)
|
|
|
|
# Database initialization
|
|
def init_db():
|
|
conn = sqlite3.connect(DB_PATH)
|
|
cursor = conn.cursor()
|
|
|
|
# Create users table
|
|
cursor.execute('''
|
|
CREATE TABLE IF NOT EXISTS users (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
username TEXT UNIQUE NOT NULL,
|
|
password TEXT NOT NULL,
|
|
email TEXT UNIQUE NOT NULL,
|
|
role TEXT NOT NULL,
|
|
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
|
|
)
|
|
''')
|
|
|
|
# Create documents table
|
|
cursor.execute('''
|
|
CREATE TABLE IF NOT EXISTS documents (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
filename TEXT NOT NULL,
|
|
original_filename TEXT NOT NULL,
|
|
custom_filename TEXT NOT NULL,
|
|
file_type TEXT NOT NULL,
|
|
file_size INTEGER NOT NULL,
|
|
category TEXT NOT NULL,
|
|
user_id INTEGER NOT NULL,
|
|
visibility TEXT NOT NULL,
|
|
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
|
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
|
FOREIGN KEY (user_id) REFERENCES users (id)
|
|
)
|
|
''')
|
|
|
|
# Create document_shares table
|
|
cursor.execute('''
|
|
CREATE TABLE IF NOT EXISTS document_shares (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
document_id INTEGER NOT NULL,
|
|
shared_by INTEGER NOT NULL,
|
|
shared_with INTEGER,
|
|
share_link TEXT,
|
|
qr_code TEXT,
|
|
expires_at TIMESTAMP,
|
|
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
|
FOREIGN KEY (document_id) REFERENCES documents (id),
|
|
FOREIGN KEY (shared_by) REFERENCES users (id),
|
|
FOREIGN KEY (shared_with) REFERENCES users (id)
|
|
)
|
|
''')
|
|
|
|
# Create document_versions table
|
|
cursor.execute('''
|
|
CREATE TABLE IF NOT EXISTS document_versions (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
document_id INTEGER NOT NULL,
|
|
filename TEXT NOT NULL,
|
|
version_number INTEGER NOT NULL,
|
|
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
|
FOREIGN KEY (document_id) REFERENCES documents (id)
|
|
)
|
|
''')
|
|
|
|
# Create analytics table
|
|
cursor.execute('''
|
|
CREATE TABLE IF NOT EXISTS analytics (
|
|
id INTEGER PRIMARY KEY AUTOINCREMENT,
|
|
document_id INTEGER NOT NULL,
|
|
action TEXT NOT NULL,
|
|
user_id INTEGER,
|
|
ip_address TEXT,
|
|
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
|
|
FOREIGN KEY (document_id) REFERENCES documents (id),
|
|
FOREIGN KEY (user_id) REFERENCES users (id)
|
|
)
|
|
''')
|
|
|
|
# Insert default admin user if not exists
|
|
cursor.execute("SELECT * FROM users WHERE username = 'admin'")
|
|
if not cursor.fetchone():
|
|
# Password: admin123
|
|
hashed_password = hashlib.sha256("admin123".encode()).hexdigest()
|
|
cursor.execute("INSERT INTO users (username, password, email, role) VALUES (?, ?, ?, ?)",
|
|
("admin", hashed_password, "admin@example.com", "admin"))
|
|
|
|
# Insert default regular user if not exists
|
|
cursor.execute("SELECT * FROM users WHERE username = 'user'")
|
|
if not cursor.fetchone():
|
|
# Password: user123
|
|
hashed_password = hashlib.sha256("user123".encode()).hexdigest()
|
|
cursor.execute("INSERT INTO users (username, password, email, role) VALUES (?, ?, ?, ?)",
|
|
("user", hashed_password, "user@example.com", "user"))
|
|
|
|
conn.commit()
|
|
conn.close()
|
|
|
|
# Helper functions
|
|
def allowed_file(filename):
|
|
return '.' in filename and filename.rsplit('.', 1)[1].lower() in ALLOWED_EXTENSIONS
|
|
|
|
def get_file_type(filename):
|
|
ext = filename.rsplit('.', 1)[1].lower()
|
|
if ext in ['pdf']:
|
|
return 'pdf'
|
|
elif ext in ['docx', 'doc']:
|
|
return 'document'
|
|
elif ext in ['xlsx', 'xls']:
|
|
return 'spreadsheet'
|
|
elif ext in ['jpg', 'jpeg', 'png', 'gif']:
|
|
return 'image'
|
|
elif ext in ['mp4', 'avi', 'mov']:
|
|
return 'video'
|
|
else:
|
|
return 'other'
|
|
|
|
def get_db_connection():
|
|
conn = sqlite3.connect(DB_PATH)
|
|
conn.row_factory = sqlite3.Row
|
|
return conn
|
|
|
|
# Authentication decorator
|
|
def login_required(f):
|
|
@wraps(f)
|
|
def decorated_function(*args, **kwargs):
|
|
if 'user_id' not in session:
|
|
flash('Please log in to access this page', 'error')
|
|
return redirect(url_for('login'))
|
|
return f(*args, **kwargs)
|
|
return decorated_function
|
|
|
|
def admin_required(f):
|
|
@wraps(f)
|
|
def decorated_function(*args, **kwargs):
|
|
if 'user_id' not in session:
|
|
flash('Please log in to access this page', 'error')
|
|
return redirect(url_for('login'))
|
|
|
|
conn = get_db_connection()
|
|
user = conn.execute('SELECT * FROM users WHERE id = ?', (session['user_id'],)).fetchone()
|
|
conn.close()
|
|
|
|
if user['role'] != 'admin':
|
|
flash('Admin access required', 'error')
|
|
return redirect(url_for('dashboard'))
|
|
|
|
return f(*args, **kwargs)
|
|
return decorated_function
|
|
|
|
# Routes
|
|
@app.route('/')
|
|
def index():
|
|
if 'user_id' in session:
|
|
return redirect(url_for('dashboard'))
|
|
return render_template('index.html')
|
|
|
|
@app.route('/login', methods=['GET', 'POST'])
|
|
def login():
|
|
if request.method == 'POST':
|
|
username = request.form['username']
|
|
password = request.form['password']
|
|
hashed_password = hashlib.sha256(password.encode()).hexdigest()
|
|
|
|
conn = get_db_connection()
|
|
user = conn.execute('SELECT * FROM users WHERE username = ? AND password = ?',
|
|
(username, hashed_password)).fetchone()
|
|
conn.close()
|
|
|
|
if user:
|
|
session['user_id'] = user['id']
|
|
session['username'] = user['username']
|
|
session['role'] = user['role']
|
|
flash(f'Welcome back, {username}!', 'success')
|
|
return redirect(url_for('dashboard'))
|
|
else:
|
|
flash('Invalid username or password', 'error')
|
|
return render_template('login.html')
|
|
else:
|
|
return render_template('login.html')
|
|
|
|
@app.route('/logout')
|
|
def logout():
|
|
session.clear()
|
|
flash('You have been logged out', 'info')
|
|
return redirect(url_for('index'))
|
|
|
|
@app.route('/dashboard')
|
|
@login_required
|
|
def dashboard():
|
|
conn = get_db_connection()
|
|
|
|
# Get document counts by category
|
|
category_counts = conn.execute('''
|
|
SELECT category, COUNT(*) as count
|
|
FROM documents
|
|
WHERE user_id = ? OR visibility = "public"
|
|
OR id IN (SELECT document_id FROM document_shares WHERE shared_with = ?)
|
|
GROUP BY category
|
|
''', (session['user_id'], session['user_id'])).fetchall()
|
|
|
|
# Get recent documents (limited to 5)
|
|
recent_documents = conn.execute('''
|
|
SELECT * FROM documents
|
|
WHERE user_id = ? OR visibility = "public"
|
|
OR id IN (SELECT document_id FROM document_shares WHERE shared_with = ?)
|
|
ORDER BY created_at DESC
|
|
LIMIT 5
|
|
''', (session['user_id'], session['user_id'])).fetchall()
|
|
|
|
# Get analytics data for admin
|
|
if session['role'] == 'admin':
|
|
analytics = conn.execute('''
|
|
SELECT documents.custom_filename, COUNT(analytics.id) as view_count
|
|
FROM documents
|
|
LEFT JOIN analytics ON documents.id = analytics.document_id
|
|
WHERE analytics.action = 'view'
|
|
GROUP BY documents.id
|
|
ORDER BY view_count DESC
|
|
LIMIT 5
|
|
''').fetchall()
|
|
|
|
# Get user activity
|
|
user_activity = conn.execute('''
|
|
SELECT users.username, COUNT(analytics.id) as action_count
|
|
FROM analytics
|
|
JOIN users ON analytics.user_id = users.id
|
|
GROUP BY analytics.user_id
|
|
ORDER BY action_count DESC
|
|
LIMIT 5
|
|
''').fetchall()
|
|
else:
|
|
analytics = None
|
|
user_activity = None
|
|
|
|
conn.close()
|
|
|
|
return render_template('dashboard.html',
|
|
recent_documents=recent_documents,
|
|
category_counts=category_counts,
|
|
analytics=analytics,
|
|
user_activity=user_activity)
|
|
|
|
@app.route('/upload', methods=['GET', 'POST'])
|
|
@login_required
|
|
def upload():
|
|
if request.method == 'POST':
|
|
# Check if the post request has the file part
|
|
if 'file' not in request.files:
|
|
flash('No file part', 'error')
|
|
return redirect(request.url)
|
|
|
|
file = request.files['file']
|
|
visibility = request.form.get('visibility', 'private')
|
|
custom_filename = request.form.get('custom_filename', '').strip()
|
|
category = request.form.get('category', 'general')
|
|
|
|
# If user does not select file, browser also
|
|
# submit an empty part without filename
|
|
if file.filename == '':
|
|
flash('No selected file', 'error')
|
|
return redirect(request.url)
|
|
|
|
if file and allowed_file(file.filename):
|
|
# Generate unique filename
|
|
original_filename = secure_filename(file.filename)
|
|
file_extension = original_filename.rsplit('.', 1)[1].lower()
|
|
unique_filename = f"{uuid.uuid4().hex}.{file_extension}"
|
|
|
|
# Use custom filename if provided, otherwise use original
|
|
if not custom_filename:
|
|
custom_filename = os.path.splitext(original_filename)[0]
|
|
|
|
# Save the file
|
|
file_path = os.path.join(app.config['UPLOAD_FOLDER'], unique_filename)
|
|
file.save(file_path)
|
|
|
|
# Get file size
|
|
file_size = os.path.getsize(file_path)
|
|
|
|
# Save to database
|
|
conn = get_db_connection()
|
|
cursor = conn.cursor()
|
|
cursor.execute('''
|
|
INSERT INTO documents (filename, original_filename, custom_filename, file_type, file_size, category, user_id, visibility)
|
|
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
|
|
''', (unique_filename, original_filename, custom_filename, get_file_type(original_filename), file_size, category, session['user_id'], visibility))
|
|
|
|
document_id = cursor.lastrowid
|
|
|
|
# Add initial version
|
|
cursor.execute('''
|
|
INSERT INTO document_versions (document_id, filename, version_number)
|
|
VALUES (?, ?, ?)
|
|
''', (document_id, unique_filename, 1))
|
|
|
|
conn.commit()
|
|
conn.close()
|
|
|
|
flash('File successfully uploaded', 'success')
|
|
return redirect(url_for('files'))
|
|
else:
|
|
flash(f'Allowed file types are: {", ".join(ALLOWED_EXTENSIONS)}', 'error')
|
|
|
|
# Get available categories
|
|
categories = ['admin', 'accounting', 'hr', 'marketing', 'legal', 'general', 'qoutation', 'other']
|
|
|
|
return render_template('upload.html', categories=categories)
|
|
|
|
@app.route('/files')
|
|
@login_required
|
|
def files():
|
|
# Get query parameters
|
|
search_query = request.args.get('search', '')
|
|
selected_category = request.args.get('category', 'all')
|
|
page = request.args.get('page', 1, type=int)
|
|
per_page = 10 # Number of documents per page
|
|
|
|
conn = get_db_connection()
|
|
|
|
# Base query
|
|
query = '''
|
|
SELECT * FROM documents
|
|
WHERE (user_id = ? OR visibility = "public"
|
|
OR id IN (SELECT document_id FROM document_shares WHERE shared_with = ?))
|
|
'''
|
|
params = [session['user_id'], session['user_id']]
|
|
|
|
# Add search condition if search query provided
|
|
if search_query:
|
|
query += ' AND (custom_filename LIKE ? OR original_filename LIKE ?)'
|
|
params.extend(['%' + search_query + '%', '%' + search_query + '%'])
|
|
|
|
# Add category filter if specific category selected
|
|
if selected_category != 'all':
|
|
query += ' AND category = ?'
|
|
params.append(selected_category)
|
|
|
|
# Count total matching documents
|
|
count_query = query.replace('SELECT *', 'SELECT COUNT(*)')
|
|
total_docs = conn.execute(count_query, params).fetchone()[0]
|
|
|
|
# Add pagination
|
|
query += ' ORDER BY created_at DESC LIMIT ? OFFSET ?'
|
|
offset = (page - 1) * per_page
|
|
params.extend([per_page, offset])
|
|
|
|
# Execute final query
|
|
documents = conn.execute(query, params).fetchall()
|
|
|
|
# Get available categories for filter dropdown
|
|
categories = ['admin', 'accounting', 'hr', 'marketing', 'legal', 'general', 'other']
|
|
|
|
# Calculate pagination info
|
|
total_pages = (total_docs + per_page - 1) // per_page # Ceiling division
|
|
pagination = {
|
|
'page': page,
|
|
'per_page': per_page,
|
|
'total': total_docs,
|
|
'pages': total_pages
|
|
}
|
|
|
|
conn.close()
|
|
|
|
return render_template('files.html',
|
|
documents=documents,
|
|
categories=categories,
|
|
selected_category=selected_category,
|
|
search_query=search_query,
|
|
pagination=pagination)
|
|
|
|
@app.route('/document/<int:document_id>/edit', methods=['GET', 'POST'])
|
|
@login_required
|
|
def edit_document(document_id):
|
|
conn = get_db_connection()
|
|
|
|
# Get document
|
|
document = conn.execute('SELECT * FROM documents WHERE id = ?', (document_id,)).fetchone()
|
|
|
|
if not document:
|
|
conn.close()
|
|
flash('Document not found', 'error')
|
|
return redirect(url_for('files'))
|
|
|
|
# Check ownership
|
|
if document['user_id'] != session['user_id'] and session['role'] != 'admin':
|
|
conn.close()
|
|
flash('You do not have permission to edit this document', 'error')
|
|
return redirect(url_for('files'))
|
|
|
|
# Get available categories
|
|
categories = ['admin', 'accounting', 'hr', 'marketing', 'legal', 'general', 'other']
|
|
|
|
if request.method == 'POST':
|
|
custom_filename = request.form.get('custom_filename', '').strip()
|
|
category = request.form.get('category', 'general')
|
|
visibility = request.form.get('visibility', 'private')
|
|
|
|
# Use custom filename if provided, otherwise keep existing
|
|
if not custom_filename:
|
|
custom_filename = document['custom_filename']
|
|
|
|
# Update document
|
|
conn.execute('''
|
|
UPDATE documents
|
|
SET custom_filename = ?, category = ?, visibility = ?, updated_at = CURRENT_TIMESTAMP
|
|
WHERE id = ?
|
|
''', (custom_filename, category, visibility, document_id))
|
|
|
|
conn.commit()
|
|
conn.close()
|
|
|
|
flash('Document updated successfully', 'success')
|
|
return redirect(url_for('view_document', document_id=document_id))
|
|
|
|
conn.close()
|
|
|
|
return render_template('edit_document.html', document=document, categories=categories)
|
|
|
|
@app.route('/document/<int:document_id>')
|
|
@login_required
|
|
def view_document(document_id):
|
|
conn = get_db_connection()
|
|
|
|
# Get document
|
|
document = conn.execute('SELECT * FROM documents WHERE id = ?', (document_id,)).fetchone()
|
|
|
|
if not document:
|
|
conn.close()
|
|
flash('Document not found', 'error')
|
|
return redirect(url_for('dashboard'))
|
|
|
|
# Check permissions
|
|
if document['visibility'] != 'public' and document['user_id'] != session['user_id']:
|
|
# Check if shared with user
|
|
shared = conn.execute('''
|
|
SELECT * FROM document_shares
|
|
WHERE document_id = ? AND shared_with = ?
|
|
''', (document_id, session['user_id'])).fetchone()
|
|
|
|
if not shared and session['role'] != 'admin':
|
|
conn.close()
|
|
flash('You do not have permission to view this document', 'error')
|
|
return redirect(url_for('dashboard'))
|
|
|
|
# Record view in analytics
|
|
conn.execute('''
|
|
INSERT INTO analytics (document_id, action, user_id, ip_address)
|
|
VALUES (?, ?, ?, ?)
|
|
''', (document_id, 'view', session['user_id'], request.remote_addr))
|
|
|
|
# Get document versions
|
|
versions = conn.execute('''
|
|
SELECT * FROM document_versions
|
|
WHERE document_id = ?
|
|
ORDER BY version_number DESC
|
|
''', (document_id,)).fetchall()
|
|
|
|
# Get share information
|
|
shares = conn.execute('''
|
|
SELECT document_shares.*, users.username
|
|
FROM document_shares
|
|
LEFT JOIN users ON document_shares.shared_with = users.id
|
|
WHERE document_id = ?
|
|
''', (document_id,)).fetchall()
|
|
|
|
conn.commit()
|
|
conn.close()
|
|
|
|
return render_template('view_document.html', document=document, versions=versions, shares=shares)
|
|
|
|
|
|
@app.route('/document/<int:document_id>/file')
|
|
@login_required
|
|
def serve_document_file(document_id):
|
|
conn = get_db_connection()
|
|
|
|
# Get document
|
|
document = conn.execute('SELECT * FROM documents WHERE id = ?', (document_id,)).fetchone()
|
|
|
|
if not document:
|
|
conn.close()
|
|
flash('Document not found', 'error')
|
|
return redirect(url_for('dashboard'))
|
|
|
|
# Check permissions
|
|
if document['visibility'] != 'public' and document['user_id'] != session['user_id']:
|
|
shared = conn.execute('''
|
|
SELECT * FROM document_shares
|
|
WHERE document_id = ? AND shared_with = ?
|
|
''', (document_id, session['user_id'])).fetchone()
|
|
|
|
if not shared and session['role'] != 'admin':
|
|
conn.close()
|
|
flash('You do not have permission to view this document', 'error')
|
|
return redirect(url_for('dashboard'))
|
|
|
|
try:
|
|
# Serve the document without download (viewable in browser)
|
|
return send_from_directory(app.config['UPLOAD_FOLDER'], document['filename'],
|
|
as_attachment=False, download_name=document['original_filename'])
|
|
except FileNotFoundError:
|
|
flash('Document file not found', 'error')
|
|
conn.close()
|
|
return redirect(url_for('dashboard'))
|
|
finally:
|
|
conn.close()
|
|
|
|
|
|
@app.route('/document/<int:document_id>/download')
|
|
@login_required
|
|
def download_document(document_id):
|
|
conn = get_db_connection()
|
|
|
|
# Get document
|
|
document = conn.execute('SELECT * FROM documents WHERE id = ?', (document_id,)).fetchone()
|
|
|
|
if not document:
|
|
conn.close()
|
|
flash('Document not found', 'error')
|
|
return redirect(url_for('dashboard'))
|
|
|
|
# Check permissions
|
|
if document['visibility'] != 'public' and document['user_id'] != session['user_id']:
|
|
# Check if shared with user
|
|
shared = conn.execute('''
|
|
SELECT * FROM document_shares
|
|
WHERE document_id = ? AND shared_with = ?
|
|
''', (document_id, session['user_id'])).fetchone()
|
|
|
|
if not shared and session['role'] != 'admin':
|
|
conn.close()
|
|
flash('You do not have permission to download this document', 'error')
|
|
return redirect(url_for('dashboard'))
|
|
|
|
# Record download in analytics
|
|
conn.execute('''
|
|
INSERT INTO analytics (document_id, action, user_id, ip_address)
|
|
VALUES (?, ?, ?, ?)
|
|
''', (document_id, 'download', session['user_id'], request.remote_addr))
|
|
|
|
conn.commit()
|
|
conn.close()
|
|
|
|
return send_from_directory(app.config['UPLOAD_FOLDER'], document['filename'],
|
|
as_attachment=True, download_name=document['original_filename'])
|
|
|
|
@app.route('/document/<int:document_id>/share', methods=['GET', 'POST'])
|
|
@login_required
|
|
def share_document(document_id):
|
|
conn = get_db_connection()
|
|
|
|
# Get document
|
|
document = conn.execute('SELECT * FROM documents WHERE id = ?', (document_id,)).fetchone()
|
|
|
|
if not document:
|
|
conn.close()
|
|
flash('Document not found', 'error')
|
|
return redirect(url_for('dashboard'))
|
|
|
|
# Check ownership
|
|
if document['user_id'] != session['user_id'] and session['role'] != 'admin':
|
|
conn.close()
|
|
flash('You do not have permission to share this document', 'error')
|
|
return redirect(url_for('dashboard'))
|
|
|
|
if request.method == 'POST':
|
|
share_type = request.form.get('share_type')
|
|
|
|
if share_type == 'user':
|
|
# Share with specific user
|
|
username = request.form.get('username')
|
|
user = conn.execute('SELECT * FROM users WHERE username = ?', (username,)).fetchone()
|
|
|
|
if not user:
|
|
flash(f'User {username} not found', 'error')
|
|
else:
|
|
# Check if already shared
|
|
existing = conn.execute('''
|
|
SELECT * FROM document_shares
|
|
WHERE document_id = ? AND shared_with = ?
|
|
''', (document_id, user['id'])).fetchone()
|
|
|
|
if existing:
|
|
flash(f'Document already shared with {username}', 'info')
|
|
else:
|
|
conn.execute('''
|
|
INSERT INTO document_shares (document_id, shared_by, shared_with)
|
|
VALUES (?, ?, ?)
|
|
''', (document_id, session['user_id'], user['id']))
|
|
flash(f'Document shared with {username}', 'success')
|
|
|
|
elif share_type == 'link':
|
|
# Generate shareable link
|
|
expiry_days = int(request.form.get('expiry', 7))
|
|
expires_at = datetime.datetime.now() + datetime.timedelta(days=expiry_days)
|
|
share_token = uuid.uuid4().hex
|
|
|
|
conn.execute('''
|
|
INSERT INTO document_shares (document_id, shared_by, share_link, expires_at)
|
|
VALUES (?, ?, ?, ?)
|
|
''', (document_id, session['user_id'], share_token, expires_at))
|
|
|
|
flash(f'Shareable link created (expires in {expiry_days} days)', 'success')
|
|
|
|
conn.commit()
|
|
|
|
# Get users for sharing
|
|
users = conn.execute('SELECT * FROM users WHERE id != ?', (session['user_id'],)).fetchall()
|
|
|
|
# Get existing shares
|
|
shares = conn.execute('''
|
|
SELECT document_shares.*, users.username
|
|
FROM document_shares
|
|
LEFT JOIN users ON document_shares.shared_with = users.id
|
|
WHERE document_id = ?
|
|
''', (document_id,)).fetchall()
|
|
|
|
conn.close()
|
|
|
|
return render_template('share_document.html', document=document, users=users, shares=shares)
|
|
|
|
@app.route('/document/<int:document_id>/delete')
|
|
@login_required
|
|
def delete_document(document_id):
|
|
conn = get_db_connection()
|
|
|
|
# Get document
|
|
document = conn.execute('SELECT * FROM documents WHERE id = ?', (document_id,)).fetchone()
|
|
|
|
if not document:
|
|
conn.close()
|
|
flash('Document not found', 'error')
|
|
return redirect(url_for('dashboard'))
|
|
|
|
# Check ownership
|
|
if document['user_id'] != session['user_id'] and session['role'] != 'admin':
|
|
conn.close()
|
|
flash('You do not have permission to delete this document', 'error')
|
|
return redirect(url_for('dashboard'))
|
|
|
|
# Get all versions
|
|
versions = conn.execute('SELECT * FROM document_versions WHERE document_id = ?', (document_id,)).fetchall()
|
|
|
|
# Delete files
|
|
for version in versions:
|
|
file_path = os.path.join(app.config['UPLOAD_FOLDER'], version['filename'])
|
|
if os.path.exists(file_path):
|
|
os.remove(file_path)
|
|
|
|
# Delete from database (cascade will handle related records)
|
|
conn.execute('DELETE FROM analytics WHERE document_id = ?', (document_id,))
|
|
conn.execute('DELETE FROM document_shares WHERE document_id = ?', (document_id,))
|
|
conn.execute('DELETE FROM document_versions WHERE document_id = ?', (document_id,))
|
|
conn.execute('DELETE FROM documents WHERE id = ?', (document_id,))
|
|
|
|
conn.commit()
|
|
conn.close()
|
|
|
|
flash('Document deleted successfully', 'success')
|
|
return redirect(url_for('dashboard'))
|
|
|
|
@app.route('/admin')
|
|
@admin_required
|
|
def admin_panel():
|
|
conn = get_db_connection()
|
|
|
|
# Get all users
|
|
users = conn.execute('SELECT * FROM users').fetchall()
|
|
|
|
# Get document statistics
|
|
doc_stats = conn.execute('''
|
|
SELECT
|
|
COUNT(*) as total_documents,
|
|
SUM(file_size) as total_size,
|
|
COUNT(DISTINCT user_id) as total_contributors
|
|
FROM documents
|
|
''').fetchone()
|
|
|
|
# Get activity statistics
|
|
activity_stats = conn.execute('''
|
|
SELECT
|
|
COUNT(*) as total_actions,
|
|
COUNT(DISTINCT user_id) as active_users,
|
|
COUNT(DISTINCT document_id) as accessed_documents
|
|
FROM analytics
|
|
''').fetchone()
|
|
|
|
conn.close()
|
|
|
|
return render_template('admin.html', users=users, doc_stats=doc_stats, activity_stats=activity_stats)
|
|
|
|
@app.route('/shared/<share_token>')
|
|
def access_shared(share_token):
|
|
conn = get_db_connection()
|
|
|
|
# Get share information
|
|
share = conn.execute('''
|
|
SELECT document_shares.*, documents.*
|
|
FROM document_shares
|
|
JOIN documents ON document_shares.document_id = documents.id
|
|
WHERE document_shares.share_link = ?
|
|
''', (share_token,)).fetchone()
|
|
|
|
if not share:
|
|
conn.close()
|
|
flash('Invalid or expired share link', 'error')
|
|
return redirect(url_for('index'))
|
|
|
|
# Check if expired
|
|
if share['expires_at'] and datetime.datetime.strptime(share['expires_at'], '%Y-%m-%d %H:%M:%S.%f') < datetime.datetime.now():
|
|
conn.close()
|
|
flash('This share link has expired', 'error')
|
|
return redirect(url_for('index'))
|
|
|
|
# Record view in analytics
|
|
user_id = session.get('user_id')
|
|
conn.execute('''
|
|
INSERT INTO analytics (document_id, action, user_id, ip_address)
|
|
VALUES (?, ?, ?, ?)
|
|
''', (share['document_id'], 'shared_view', user_id, request.remote_addr))
|
|
|
|
conn.commit()
|
|
|
|
# Get document versions
|
|
versions = conn.execute('''
|
|
SELECT * FROM document_versions
|
|
WHERE document_id = ?
|
|
ORDER BY version_number DESC
|
|
''', (share['document_id'],)).fetchall()
|
|
|
|
conn.close()
|
|
|
|
return render_template('shared_view.html', document=share, versions=versions, share_token=share_token)
|
|
|
|
@app.route('/shared/<share_token>/download')
|
|
def download_shared(share_token):
|
|
conn = get_db_connection()
|
|
|
|
# Get share information
|
|
share = conn.execute('''
|
|
SELECT document_shares.*, documents.*
|
|
FROM document_shares
|
|
JOIN documents ON document_shares.document_id = documents.id
|
|
WHERE document_shares.share_link = ?
|
|
''', (share_token,)).fetchone()
|
|
|
|
if not share:
|
|
conn.close()
|
|
flash('Invalid or expired share link', 'error')
|
|
return redirect(url_for('index'))
|
|
|
|
# Check if expired
|
|
if share['expires_at'] and datetime.datetime.strptime(share['expires_at'], '%Y-%m-%d %H:%M:%S.%f') < datetime.datetime.now():
|
|
conn.close()
|
|
flash('This share link has expired', 'error')
|
|
return redirect(url_for('index'))
|
|
|
|
# Record download in analytics
|
|
user_id = session.get('user_id')
|
|
conn.execute('''
|
|
INSERT INTO analytics (document_id, action, user_id, ip_address)
|
|
VALUES (?, ?, ?, ?)
|
|
''', (share['document_id'], 'shared_download', user_id, request.remote_addr))
|
|
|
|
conn.commit()
|
|
conn.close()
|
|
|
|
return send_from_directory(app.config['UPLOAD_FOLDER'], share['filename'],
|
|
as_attachment=True, download_name=share['original_filename'])
|
|
|
|
# Error handlers
|
|
@app.errorhandler(404)
|
|
def page_not_found(e):
|
|
return render_template('error.html', error_code=404, error_message="Page not found"), 404
|
|
|
|
@app.errorhandler(500)
|
|
def internal_server_error(e):
|
|
return render_template('error.html', error_code=500, error_message="Internal server error"), 500
|
|
|
|
# Initialize database and start app
|
|
if __name__ == '__main__':
|
|
init_db()
|
|
app.run(host='0.0.0.0', port=1000, debug=False)
|
|
else:
|
|
# For production WSGI servers
|
|
init_db()
|