DMS/app.py
2025-05-01 00:24:26 +08:00

817 lines
28 KiB
Python

import os
import sqlite3
import uuid
import hashlib
import datetime
from flask import Flask, render_template, request, redirect, url_for, flash, session, send_from_directory
from werkzeug.utils import secure_filename
from functools import wraps
app = Flask(__name__)
app.secret_key = os.environ.get('SECRET_KEY', os.urandom(24))
# Configuration
UPLOAD_FOLDER = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'uploads')
ALLOWED_EXTENSIONS = {'pdf', 'docx', 'xlsx', 'jpg', 'png', 'mp4', 'txt'}
DB_PATH = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'dms.db')
# Create upload folder if it doesn't exist
if not os.path.exists(UPLOAD_FOLDER):
os.makedirs(UPLOAD_FOLDER)
app.config['UPLOAD_FOLDER'] = UPLOAD_FOLDER
app.config['MAX_CONTENT_LENGTH'] = 16 * 1024 * 1024 # 16MB max upload size
# Production configuration
app.config['PREFERRED_URL_SCHEME'] = 'https'
app.config['SESSION_COOKIE_SECURE'] = False
app.config['SESSION_COOKIE_HTTPONLY'] = True
app.config['PERMANENT_SESSION_LIFETIME'] = datetime.timedelta(days=1)
# Database initialization
def init_db():
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
# Create users table
cursor.execute('''
CREATE TABLE IF NOT EXISTS users (
id INTEGER PRIMARY KEY AUTOINCREMENT,
username TEXT UNIQUE NOT NULL,
password TEXT NOT NULL,
email TEXT UNIQUE NOT NULL,
role TEXT NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
''')
# Create documents table
cursor.execute('''
CREATE TABLE IF NOT EXISTS documents (
id INTEGER PRIMARY KEY AUTOINCREMENT,
filename TEXT NOT NULL,
original_filename TEXT NOT NULL,
custom_filename TEXT NOT NULL,
file_type TEXT NOT NULL,
file_size INTEGER NOT NULL,
category TEXT NOT NULL,
user_id INTEGER NOT NULL,
visibility TEXT NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (user_id) REFERENCES users (id)
)
''')
# Create document_shares table
cursor.execute('''
CREATE TABLE IF NOT EXISTS document_shares (
id INTEGER PRIMARY KEY AUTOINCREMENT,
document_id INTEGER NOT NULL,
shared_by INTEGER NOT NULL,
shared_with INTEGER,
share_link TEXT,
qr_code TEXT,
expires_at TIMESTAMP,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (document_id) REFERENCES documents (id),
FOREIGN KEY (shared_by) REFERENCES users (id),
FOREIGN KEY (shared_with) REFERENCES users (id)
)
''')
# Create document_versions table
cursor.execute('''
CREATE TABLE IF NOT EXISTS document_versions (
id INTEGER PRIMARY KEY AUTOINCREMENT,
document_id INTEGER NOT NULL,
filename TEXT NOT NULL,
version_number INTEGER NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (document_id) REFERENCES documents (id)
)
''')
# Create analytics table
cursor.execute('''
CREATE TABLE IF NOT EXISTS analytics (
id INTEGER PRIMARY KEY AUTOINCREMENT,
document_id INTEGER NOT NULL,
action TEXT NOT NULL,
user_id INTEGER,
ip_address TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (document_id) REFERENCES documents (id),
FOREIGN KEY (user_id) REFERENCES users (id)
)
''')
# Insert default admin user if not exists
cursor.execute("SELECT * FROM users WHERE username = 'admin'")
if not cursor.fetchone():
# Password: admin123
hashed_password = hashlib.sha256("admin123".encode()).hexdigest()
cursor.execute("INSERT INTO users (username, password, email, role) VALUES (?, ?, ?, ?)",
("admin", hashed_password, "admin@example.com", "admin"))
# Insert default regular user if not exists
cursor.execute("SELECT * FROM users WHERE username = 'user'")
if not cursor.fetchone():
# Password: user123
hashed_password = hashlib.sha256("user123".encode()).hexdigest()
cursor.execute("INSERT INTO users (username, password, email, role) VALUES (?, ?, ?, ?)",
("user", hashed_password, "user@example.com", "user"))
conn.commit()
conn.close()
# Helper functions
def allowed_file(filename):
return '.' in filename and filename.rsplit('.', 1)[1].lower() in ALLOWED_EXTENSIONS
def get_file_type(filename):
ext = filename.rsplit('.', 1)[1].lower()
if ext in ['pdf']:
return 'pdf'
elif ext in ['docx', 'doc']:
return 'document'
elif ext in ['xlsx', 'xls']:
return 'spreadsheet'
elif ext in ['jpg', 'jpeg', 'png', 'gif']:
return 'image'
elif ext in ['mp4', 'avi', 'mov']:
return 'video'
else:
return 'other'
def get_db_connection():
conn = sqlite3.connect(DB_PATH)
conn.row_factory = sqlite3.Row
return conn
# Authentication decorator
def login_required(f):
@wraps(f)
def decorated_function(*args, **kwargs):
if 'user_id' not in session:
flash('Please log in to access this page', 'error')
return redirect(url_for('login'))
return f(*args, **kwargs)
return decorated_function
def admin_required(f):
@wraps(f)
def decorated_function(*args, **kwargs):
if 'user_id' not in session:
flash('Please log in to access this page', 'error')
return redirect(url_for('login'))
conn = get_db_connection()
user = conn.execute('SELECT * FROM users WHERE id = ?', (session['user_id'],)).fetchone()
conn.close()
if user['role'] != 'admin':
flash('Admin access required', 'error')
return redirect(url_for('dashboard'))
return f(*args, **kwargs)
return decorated_function
# Routes
@app.route('/')
def index():
if 'user_id' in session:
return redirect(url_for('dashboard'))
return render_template('index.html')
@app.route('/login', methods=['GET', 'POST'])
def login():
if request.method == 'POST':
username = request.form['username']
password = request.form['password']
hashed_password = hashlib.sha256(password.encode()).hexdigest()
conn = get_db_connection()
user = conn.execute('SELECT * FROM users WHERE username = ? AND password = ?',
(username, hashed_password)).fetchone()
conn.close()
if user:
session['user_id'] = user['id']
session['username'] = user['username']
session['role'] = user['role']
flash(f'Welcome back, {username}!', 'success')
return redirect(url_for('dashboard'))
else:
flash('Invalid username or password', 'error')
return render_template('login.html')
else:
return render_template('login.html')
@app.route('/logout')
def logout():
session.clear()
flash('You have been logged out', 'info')
return redirect(url_for('index'))
@app.route('/dashboard')
@login_required
def dashboard():
conn = get_db_connection()
# Get document counts by category
category_counts = conn.execute('''
SELECT category, COUNT(*) as count
FROM documents
WHERE user_id = ? OR visibility = "public"
OR id IN (SELECT document_id FROM document_shares WHERE shared_with = ?)
GROUP BY category
''', (session['user_id'], session['user_id'])).fetchall()
# Get recent documents (limited to 5)
recent_documents = conn.execute('''
SELECT * FROM documents
WHERE user_id = ? OR visibility = "public"
OR id IN (SELECT document_id FROM document_shares WHERE shared_with = ?)
ORDER BY created_at DESC
LIMIT 5
''', (session['user_id'], session['user_id'])).fetchall()
# Get analytics data for admin
if session['role'] == 'admin':
analytics = conn.execute('''
SELECT documents.custom_filename, COUNT(analytics.id) as view_count
FROM documents
LEFT JOIN analytics ON documents.id = analytics.document_id
WHERE analytics.action = 'view'
GROUP BY documents.id
ORDER BY view_count DESC
LIMIT 5
''').fetchall()
# Get user activity
user_activity = conn.execute('''
SELECT users.username, COUNT(analytics.id) as action_count
FROM analytics
JOIN users ON analytics.user_id = users.id
GROUP BY analytics.user_id
ORDER BY action_count DESC
LIMIT 5
''').fetchall()
else:
analytics = None
user_activity = None
conn.close()
return render_template('dashboard.html',
recent_documents=recent_documents,
category_counts=category_counts,
analytics=analytics,
user_activity=user_activity)
@app.route('/upload', methods=['GET', 'POST'])
@login_required
def upload():
if request.method == 'POST':
# Check if the post request has the file part
if 'file' not in request.files:
flash('No file part', 'error')
return redirect(request.url)
file = request.files['file']
visibility = request.form.get('visibility', 'private')
custom_filename = request.form.get('custom_filename', '').strip()
category = request.form.get('category', 'general')
# If user does not select file, browser also
# submit an empty part without filename
if file.filename == '':
flash('No selected file', 'error')
return redirect(request.url)
if file and allowed_file(file.filename):
# Generate unique filename
original_filename = secure_filename(file.filename)
file_extension = original_filename.rsplit('.', 1)[1].lower()
unique_filename = f"{uuid.uuid4().hex}.{file_extension}"
# Use custom filename if provided, otherwise use original
if not custom_filename:
custom_filename = os.path.splitext(original_filename)[0]
# Save the file
file_path = os.path.join(app.config['UPLOAD_FOLDER'], unique_filename)
file.save(file_path)
# Get file size
file_size = os.path.getsize(file_path)
# Save to database
conn = get_db_connection()
cursor = conn.cursor()
cursor.execute('''
INSERT INTO documents (filename, original_filename, custom_filename, file_type, file_size, category, user_id, visibility)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
''', (unique_filename, original_filename, custom_filename, get_file_type(original_filename), file_size, category, session['user_id'], visibility))
document_id = cursor.lastrowid
# Add initial version
cursor.execute('''
INSERT INTO document_versions (document_id, filename, version_number)
VALUES (?, ?, ?)
''', (document_id, unique_filename, 1))
conn.commit()
conn.close()
flash('File successfully uploaded', 'success')
return redirect(url_for('files'))
else:
flash(f'Allowed file types are: {", ".join(ALLOWED_EXTENSIONS)}', 'error')
# Get available categories
categories = ['admin', 'accounting', 'hr', 'marketing', 'legal', 'general', 'other']
return render_template('upload.html', categories=categories)
@app.route('/files')
@login_required
def files():
# Get query parameters
search_query = request.args.get('search', '')
selected_category = request.args.get('category', 'all')
page = request.args.get('page', 1, type=int)
per_page = 10 # Number of documents per page
conn = get_db_connection()
# Base query
query = '''
SELECT * FROM documents
WHERE (user_id = ? OR visibility = "public"
OR id IN (SELECT document_id FROM document_shares WHERE shared_with = ?))
'''
params = [session['user_id'], session['user_id']]
# Add search condition if search query provided
if search_query:
query += ' AND (custom_filename LIKE ? OR original_filename LIKE ?)'
params.extend(['%' + search_query + '%', '%' + search_query + '%'])
# Add category filter if specific category selected
if selected_category != 'all':
query += ' AND category = ?'
params.append(selected_category)
# Count total matching documents
count_query = query.replace('SELECT *', 'SELECT COUNT(*)')
total_docs = conn.execute(count_query, params).fetchone()[0]
# Add pagination
query += ' ORDER BY created_at DESC LIMIT ? OFFSET ?'
offset = (page - 1) * per_page
params.extend([per_page, offset])
# Execute final query
documents = conn.execute(query, params).fetchall()
# Get available categories for filter dropdown
categories = ['admin', 'accounting', 'hr', 'marketing', 'legal', 'general', 'other']
# Calculate pagination info
total_pages = (total_docs + per_page - 1) // per_page # Ceiling division
pagination = {
'page': page,
'per_page': per_page,
'total': total_docs,
'pages': total_pages
}
conn.close()
return render_template('files.html',
documents=documents,
categories=categories,
selected_category=selected_category,
search_query=search_query,
pagination=pagination)
@app.route('/document/<int:document_id>/edit', methods=['GET', 'POST'])
@login_required
def edit_document(document_id):
conn = get_db_connection()
# Get document
document = conn.execute('SELECT * FROM documents WHERE id = ?', (document_id,)).fetchone()
if not document:
conn.close()
flash('Document not found', 'error')
return redirect(url_for('files'))
# Check ownership
if document['user_id'] != session['user_id'] and session['role'] != 'admin':
conn.close()
flash('You do not have permission to edit this document', 'error')
return redirect(url_for('files'))
# Get available categories
categories = ['admin', 'accounting', 'hr', 'marketing', 'legal', 'general', 'other']
if request.method == 'POST':
custom_filename = request.form.get('custom_filename', '').strip()
category = request.form.get('category', 'general')
visibility = request.form.get('visibility', 'private')
# Use custom filename if provided, otherwise keep existing
if not custom_filename:
custom_filename = document['custom_filename']
# Update document
conn.execute('''
UPDATE documents
SET custom_filename = ?, category = ?, visibility = ?, updated_at = CURRENT_TIMESTAMP
WHERE id = ?
''', (custom_filename, category, visibility, document_id))
conn.commit()
conn.close()
flash('Document updated successfully', 'success')
return redirect(url_for('view_document', document_id=document_id))
conn.close()
return render_template('edit_document.html', document=document, categories=categories)
@app.route('/document/<int:document_id>')
@login_required
def view_document(document_id):
conn = get_db_connection()
# Get document
document = conn.execute('SELECT * FROM documents WHERE id = ?', (document_id,)).fetchone()
if not document:
conn.close()
flash('Document not found', 'error')
return redirect(url_for('dashboard'))
# Check permissions
if document['visibility'] != 'public' and document['user_id'] != session['user_id']:
# Check if shared with user
shared = conn.execute('''
SELECT * FROM document_shares
WHERE document_id = ? AND shared_with = ?
''', (document_id, session['user_id'])).fetchone()
if not shared and session['role'] != 'admin':
conn.close()
flash('You do not have permission to view this document', 'error')
return redirect(url_for('dashboard'))
# Record view in analytics
conn.execute('''
INSERT INTO analytics (document_id, action, user_id, ip_address)
VALUES (?, ?, ?, ?)
''', (document_id, 'view', session['user_id'], request.remote_addr))
# Get document versions
versions = conn.execute('''
SELECT * FROM document_versions
WHERE document_id = ?
ORDER BY version_number DESC
''', (document_id,)).fetchall()
# Get share information
shares = conn.execute('''
SELECT document_shares.*, users.username
FROM document_shares
LEFT JOIN users ON document_shares.shared_with = users.id
WHERE document_id = ?
''', (document_id,)).fetchall()
conn.commit()
conn.close()
return render_template('view_document.html', document=document, versions=versions, shares=shares)
@app.route('/document/<int:document_id>/file')
@login_required
def serve_document_file(document_id):
conn = get_db_connection()
# Get document
document = conn.execute('SELECT * FROM documents WHERE id = ?', (document_id,)).fetchone()
if not document:
conn.close()
flash('Document not found', 'error')
return redirect(url_for('dashboard'))
# Check permissions
if document['visibility'] != 'public' and document['user_id'] != session['user_id']:
shared = conn.execute('''
SELECT * FROM document_shares
WHERE document_id = ? AND shared_with = ?
''', (document_id, session['user_id'])).fetchone()
if not shared and session['role'] != 'admin':
conn.close()
flash('You do not have permission to view this document', 'error')
return redirect(url_for('dashboard'))
try:
# Serve the document without download (viewable in browser)
return send_from_directory(app.config['UPLOAD_FOLDER'], document['filename'],
as_attachment=False, download_name=document['original_filename'])
except FileNotFoundError:
flash('Document file not found', 'error')
conn.close()
return redirect(url_for('dashboard'))
finally:
conn.close()
@app.route('/document/<int:document_id>/download')
@login_required
def download_document(document_id):
conn = get_db_connection()
# Get document
document = conn.execute('SELECT * FROM documents WHERE id = ?', (document_id,)).fetchone()
if not document:
conn.close()
flash('Document not found', 'error')
return redirect(url_for('dashboard'))
# Check permissions
if document['visibility'] != 'public' and document['user_id'] != session['user_id']:
# Check if shared with user
shared = conn.execute('''
SELECT * FROM document_shares
WHERE document_id = ? AND shared_with = ?
''', (document_id, session['user_id'])).fetchone()
if not shared and session['role'] != 'admin':
conn.close()
flash('You do not have permission to download this document', 'error')
return redirect(url_for('dashboard'))
# Record download in analytics
conn.execute('''
INSERT INTO analytics (document_id, action, user_id, ip_address)
VALUES (?, ?, ?, ?)
''', (document_id, 'download', session['user_id'], request.remote_addr))
conn.commit()
conn.close()
return send_from_directory(app.config['UPLOAD_FOLDER'], document['filename'],
as_attachment=True, download_name=document['original_filename'])
@app.route('/document/<int:document_id>/share', methods=['GET', 'POST'])
@login_required
def share_document(document_id):
conn = get_db_connection()
# Get document
document = conn.execute('SELECT * FROM documents WHERE id = ?', (document_id,)).fetchone()
if not document:
conn.close()
flash('Document not found', 'error')
return redirect(url_for('dashboard'))
# Check ownership
if document['user_id'] != session['user_id'] and session['role'] != 'admin':
conn.close()
flash('You do not have permission to share this document', 'error')
return redirect(url_for('dashboard'))
if request.method == 'POST':
share_type = request.form.get('share_type')
if share_type == 'user':
# Share with specific user
username = request.form.get('username')
user = conn.execute('SELECT * FROM users WHERE username = ?', (username,)).fetchone()
if not user:
flash(f'User {username} not found', 'error')
else:
# Check if already shared
existing = conn.execute('''
SELECT * FROM document_shares
WHERE document_id = ? AND shared_with = ?
''', (document_id, user['id'])).fetchone()
if existing:
flash(f'Document already shared with {username}', 'info')
else:
conn.execute('''
INSERT INTO document_shares (document_id, shared_by, shared_with)
VALUES (?, ?, ?)
''', (document_id, session['user_id'], user['id']))
flash(f'Document shared with {username}', 'success')
elif share_type == 'link':
# Generate shareable link
expiry_days = int(request.form.get('expiry', 7))
expires_at = datetime.datetime.now() + datetime.timedelta(days=expiry_days)
share_token = uuid.uuid4().hex
conn.execute('''
INSERT INTO document_shares (document_id, shared_by, share_link, expires_at)
VALUES (?, ?, ?, ?)
''', (document_id, session['user_id'], share_token, expires_at))
flash(f'Shareable link created (expires in {expiry_days} days)', 'success')
conn.commit()
# Get users for sharing
users = conn.execute('SELECT * FROM users WHERE id != ?', (session['user_id'],)).fetchall()
# Get existing shares
shares = conn.execute('''
SELECT document_shares.*, users.username
FROM document_shares
LEFT JOIN users ON document_shares.shared_with = users.id
WHERE document_id = ?
''', (document_id,)).fetchall()
conn.close()
return render_template('share_document.html', document=document, users=users, shares=shares)
@app.route('/document/<int:document_id>/delete')
@login_required
def delete_document(document_id):
conn = get_db_connection()
# Get document
document = conn.execute('SELECT * FROM documents WHERE id = ?', (document_id,)).fetchone()
if not document:
conn.close()
flash('Document not found', 'error')
return redirect(url_for('dashboard'))
# Check ownership
if document['user_id'] != session['user_id'] and session['role'] != 'admin':
conn.close()
flash('You do not have permission to delete this document', 'error')
return redirect(url_for('dashboard'))
# Get all versions
versions = conn.execute('SELECT * FROM document_versions WHERE document_id = ?', (document_id,)).fetchall()
# Delete files
for version in versions:
file_path = os.path.join(app.config['UPLOAD_FOLDER'], version['filename'])
if os.path.exists(file_path):
os.remove(file_path)
# Delete from database (cascade will handle related records)
conn.execute('DELETE FROM analytics WHERE document_id = ?', (document_id,))
conn.execute('DELETE FROM document_shares WHERE document_id = ?', (document_id,))
conn.execute('DELETE FROM document_versions WHERE document_id = ?', (document_id,))
conn.execute('DELETE FROM documents WHERE id = ?', (document_id,))
conn.commit()
conn.close()
flash('Document deleted successfully', 'success')
return redirect(url_for('dashboard'))
@app.route('/admin')
@admin_required
def admin_panel():
conn = get_db_connection()
# Get all users
users = conn.execute('SELECT * FROM users').fetchall()
# Get document statistics
doc_stats = conn.execute('''
SELECT
COUNT(*) as total_documents,
SUM(file_size) as total_size,
COUNT(DISTINCT user_id) as total_contributors
FROM documents
''').fetchone()
# Get activity statistics
activity_stats = conn.execute('''
SELECT
COUNT(*) as total_actions,
COUNT(DISTINCT user_id) as active_users,
COUNT(DISTINCT document_id) as accessed_documents
FROM analytics
''').fetchone()
conn.close()
return render_template('admin.html', users=users, doc_stats=doc_stats, activity_stats=activity_stats)
@app.route('/shared/<share_token>')
def access_shared(share_token):
conn = get_db_connection()
# Get share information
share = conn.execute('''
SELECT document_shares.*, documents.*
FROM document_shares
JOIN documents ON document_shares.document_id = documents.id
WHERE document_shares.share_link = ?
''', (share_token,)).fetchone()
if not share:
conn.close()
flash('Invalid or expired share link', 'error')
return redirect(url_for('index'))
# Check if expired
if share['expires_at'] and datetime.datetime.strptime(share['expires_at'], '%Y-%m-%d %H:%M:%S.%f') < datetime.datetime.now():
conn.close()
flash('This share link has expired', 'error')
return redirect(url_for('index'))
# Record view in analytics
user_id = session.get('user_id')
conn.execute('''
INSERT INTO analytics (document_id, action, user_id, ip_address)
VALUES (?, ?, ?, ?)
''', (share['document_id'], 'shared_view', user_id, request.remote_addr))
conn.commit()
# Get document versions
versions = conn.execute('''
SELECT * FROM document_versions
WHERE document_id = ?
ORDER BY version_number DESC
''', (share['document_id'],)).fetchall()
conn.close()
return render_template('shared_view.html', document=share, versions=versions, share_token=share_token)
@app.route('/shared/<share_token>/download')
def download_shared(share_token):
conn = get_db_connection()
# Get share information
share = conn.execute('''
SELECT document_shares.*, documents.*
FROM document_shares
JOIN documents ON document_shares.document_id = documents.id
WHERE document_shares.share_link = ?
''', (share_token,)).fetchone()
if not share:
conn.close()
flash('Invalid or expired share link', 'error')
return redirect(url_for('index'))
# Check if expired
if share['expires_at'] and datetime.datetime.strptime(share['expires_at'], '%Y-%m-%d %H:%M:%S.%f') < datetime.datetime.now():
conn.close()
flash('This share link has expired', 'error')
return redirect(url_for('index'))
# Record download in analytics
user_id = session.get('user_id')
conn.execute('''
INSERT INTO analytics (document_id, action, user_id, ip_address)
VALUES (?, ?, ?, ?)
''', (share['document_id'], 'shared_download', user_id, request.remote_addr))
conn.commit()
conn.close()
return send_from_directory(app.config['UPLOAD_FOLDER'], share['filename'],
as_attachment=True, download_name=share['original_filename'])
# Error handlers
@app.errorhandler(404)
def page_not_found(e):
return render_template('error.html', error_code=404, error_message="Page not found"), 404
@app.errorhandler(500)
def internal_server_error(e):
return render_template('error.html', error_code=500, error_message="Internal server error"), 500
# Initialize database and start app
if __name__ == '__main__':
init_db()
app.run(host='0.0.0.0', port=1000, debug=False)
else:
# For production WSGI servers
init_db()