# main.py (полная версия с тегами и админкой) from fastapi import FastAPI, HTTPException, Depends, status, Request, Form from fastapi.middleware.cors import CORSMiddleware from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials from fastapi.responses import HTMLResponse, FileResponse, RedirectResponse from fastapi.staticfiles import StaticFiles from pydantic import BaseModel, EmailStr from typing import Optional, List from datetime import datetime, timedelta import jwt import sqlite3 import hashlib import os from contextlib import contextmanager import uvicorn from pathlib import Path app = FastAPI(title="Rabota.Today API", version="2.0.0") # Настройки SECRET_KEY = "your-secret-key-here-change-in-production" ALGORITHM = "HS256" ACCESS_TOKEN_EXPIRE_MINUTES = 60 * 24 * 7 # 7 дней # CORS app.add_middleware( CORSMiddleware, allow_origins=["*"], allow_credentials=True, allow_methods=["*"], allow_headers=["*"], ) security = HTTPBearer() # Создаем директории BASE_DIR = Path(__file__).parent TEMPLATES_DIR = BASE_DIR / "templates" STATIC_DIR = BASE_DIR / "static" TEMPLATES_DIR.mkdir(exist_ok=True) STATIC_DIR.mkdir(exist_ok=True) (STATIC_DIR / "css").mkdir(exist_ok=True) (STATIC_DIR / "js").mkdir(exist_ok=True) # Подключаем статические файлы app.mount("/static", StaticFiles(directory=str(STATIC_DIR)), name="static") # ========== МОДЕЛИ БАЗЫ ДАННЫХ ========== def init_db(): """Инициализация базы данных""" with get_db() as conn: cursor = conn.cursor() # Таблица пользователей cursor.execute(""" CREATE TABLE IF NOT EXISTS users ( id INTEGER PRIMARY KEY AUTOINCREMENT, full_name TEXT NOT NULL, email TEXT UNIQUE NOT NULL, phone TEXT NOT NULL, telegram TEXT, password_hash TEXT NOT NULL, role TEXT NOT NULL CHECK(role IN ('employee', 'employer', 'admin')), is_admin BOOLEAN DEFAULT 0, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ) """) # Таблица вакансий cursor.execute(""" CREATE TABLE IF NOT EXISTS vacancies ( id INTEGER PRIMARY KEY AUTOINCREMENT, user_id INTEGER NOT NULL, title TEXT NOT NULL, salary TEXT, description TEXT, contact TEXT, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, is_active BOOLEAN DEFAULT 1, views INTEGER DEFAULT 0, FOREIGN KEY (user_id) REFERENCES users (id) ) """) # Таблица резюме cursor.execute(""" CREATE TABLE IF NOT EXISTS resumes ( id INTEGER PRIMARY KEY AUTOINCREMENT, user_id INTEGER UNIQUE NOT NULL, desired_position TEXT, about_me TEXT, desired_salary TEXT, updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, views INTEGER DEFAULT 0, FOREIGN KEY (user_id) REFERENCES users (id) ) """) # Таблица тегов cursor.execute(""" CREATE TABLE IF NOT EXISTS tags ( id INTEGER PRIMARY KEY AUTOINCREMENT, name TEXT UNIQUE NOT NULL, category TEXT CHECK(category IN ('skill', 'industry', 'position', 'other')) DEFAULT 'skill', created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ) """) # Связь тегов с вакансиями cursor.execute(""" CREATE TABLE IF NOT EXISTS vacancy_tags ( vacancy_id INTEGER NOT NULL, tag_id INTEGER NOT NULL, PRIMARY KEY (vacancy_id, tag_id), FOREIGN KEY (vacancy_id) REFERENCES vacancies (id) ON DELETE CASCADE, FOREIGN KEY (tag_id) REFERENCES tags (id) ON DELETE CASCADE ) """) # Связь тегов с резюме cursor.execute(""" CREATE TABLE IF NOT EXISTS resume_tags ( resume_id INTEGER NOT NULL, tag_id INTEGER NOT NULL, PRIMARY KEY (resume_id, tag_id), FOREIGN KEY (resume_id) REFERENCES resumes (id) ON DELETE CASCADE, FOREIGN KEY (tag_id) REFERENCES tags (id) ON DELETE CASCADE ) """) # Таблица опыта работы cursor.execute(""" CREATE TABLE IF NOT EXISTS work_experience ( id INTEGER PRIMARY KEY AUTOINCREMENT, resume_id INTEGER NOT NULL, position TEXT NOT NULL, company TEXT NOT NULL, period TEXT, FOREIGN KEY (resume_id) REFERENCES resumes (id) ON DELETE CASCADE ) """) # Таблица образования cursor.execute(""" CREATE TABLE IF NOT EXISTS education ( id INTEGER PRIMARY KEY AUTOINCREMENT, resume_id INTEGER NOT NULL, institution TEXT NOT NULL, specialty TEXT, graduation_year TEXT, FOREIGN KEY (resume_id) REFERENCES resumes (id) ON DELETE CASCADE ) """) # Таблица откликов cursor.execute(""" CREATE TABLE IF NOT EXISTS applications ( id INTEGER PRIMARY KEY AUTOINCREMENT, vacancy_id INTEGER NOT NULL, user_id INTEGER NOT NULL, message TEXT, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, status TEXT DEFAULT 'pending', FOREIGN KEY (vacancy_id) REFERENCES vacancies (id), FOREIGN KEY (user_id) REFERENCES users (id) ) """) # Таблица компаний (для работодателей) cursor.execute(""" CREATE TABLE IF NOT EXISTS companies ( id INTEGER PRIMARY KEY AUTOINCREMENT, user_id INTEGER UNIQUE NOT NULL, name TEXT NOT NULL, description TEXT, website TEXT, logo TEXT, address TEXT, phone TEXT, email TEXT, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, FOREIGN KEY (user_id) REFERENCES users (id) ON DELETE CASCADE ) """) # Таблица избранного (favorites) cursor.execute(""" CREATE TABLE IF NOT EXISTS favorites ( id INTEGER PRIMARY KEY AUTOINCREMENT, user_id INTEGER NOT NULL, item_type TEXT NOT NULL CHECK(item_type IN ('vacancy', 'resume')), item_id INTEGER NOT NULL, created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, UNIQUE(user_id, item_type, item_id), FOREIGN KEY (user_id) REFERENCES users (id) ON DELETE CASCADE ) """) # Создаем индексы для быстрого поиска cursor.execute(""" CREATE INDEX IF NOT EXISTS idx_favorites_user ON favorites(user_id) """) cursor.execute(""" CREATE INDEX IF NOT EXISTS idx_favorites_item ON favorites(item_type, item_id) """) # Таблица откликов (обновленная версия) cursor.execute(""" CREATE TABLE IF NOT EXISTS applications ( id INTEGER PRIMARY KEY AUTOINCREMENT, vacancy_id INTEGER NOT NULL, user_id INTEGER NOT NULL, message TEXT, status TEXT DEFAULT 'pending' CHECK(status IN ('pending', 'viewed', 'accepted', 'rejected')), created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, viewed_at TIMESTAMP, response_message TEXT, response_at TIMESTAMP, FOREIGN KEY (vacancy_id) REFERENCES vacancies (id) ON DELETE CASCADE, FOREIGN KEY (user_id) REFERENCES users (id) ON DELETE CASCADE ) """) # Создаем индексы для быстрого поиска cursor.execute(""" CREATE INDEX IF NOT EXISTS idx_applications_vacancy ON applications(vacancy_id) """) cursor.execute(""" CREATE INDEX IF NOT EXISTS idx_applications_user ON applications(user_id) """) cursor.execute(""" CREATE INDEX IF NOT EXISTS idx_applications_status ON applications(status) """) # Создаем админа по умолчанию admin_password = hash_password("admin123") cursor.execute(""" INSERT OR IGNORE INTO users (full_name, email, phone, password_hash, role, is_admin) VALUES ('Admin', 'admin@rabota.today', '+70000000000', ?, 'admin', 1) """, (admin_password,)) # Добавляем популярные теги default_tags = [ ('Python', 'skill'), ('JavaScript', 'skill'), ('React', 'skill'), ('Vue.js', 'skill'), ('Node.js', 'skill'), ('Java', 'skill'), ('C++', 'skill'), ('PHP', 'skill'), ('HTML/CSS', 'skill'), ('SQL', 'skill'), ('Менеджер', 'position'), ('Разработчик', 'position'), ('Дизайнер', 'position'), ('Маркетолог', 'position'), ('Аналитик', 'position'), ('IT', 'industry'), ('Финансы', 'industry'), ('Медицина', 'industry'), ('Образование', 'industry'), ('Продажи', 'industry'), ] for tag_name, category in default_tags: cursor.execute("INSERT OR IGNORE INTO tags (name, category) VALUES (?, ?)", (tag_name, category)) conn.commit() print("✅ База данных инициализирована") def set_initial_sequence_ids(): """Установка начальных ID для таблиц при первом запуске""" with get_db() as conn: cursor = conn.cursor() # Проверяем, есть ли уже записи в таблицах cursor.execute("SELECT COUNT(*) FROM vacancies") vacancies_count = cursor.fetchone()[0] cursor.execute("SELECT COUNT(*) FROM resumes") resumes_count = cursor.fetchone()[0] # Если таблицы пустые, устанавливаем начальные ID if vacancies_count == 0: cursor.execute("INSERT OR REPLACE INTO sqlite_sequence (name, seq) VALUES ('vacancies', 9999)") print("✅ Установлен начальный ID для вакансий: 10000") if resumes_count == 0: cursor.execute("INSERT OR REPLACE INTO sqlite_sequence (name, seq) VALUES ('resumes', 9999)") print("✅ Установлен начальный ID для резюме: 10000") conn.commit() @contextmanager def get_db(): """Контекстный менеджер для работы с БД""" conn = sqlite3.connect("rabota_today.db") conn.row_factory = sqlite3.Row try: yield conn finally: conn.close() # ========== МОДЕЛИ PYDANTIC ========== class UserRegister(BaseModel): full_name: str email: EmailStr phone: str telegram: Optional[str] = None password: str role: str class UserLogin(BaseModel): email: EmailStr password: str class TokenResponse(BaseModel): access_token: str token_type: str user_id: int full_name: str role: str is_admin: bool class Tag(BaseModel): id: Optional[int] = None name: str category: str = "skill" class VacancyCreate(BaseModel): title: str salary: Optional[str] = None description: Optional[str] = None contact: Optional[str] = None tags: List[str] = [] # список названий тегов class VacancyResponse(BaseModel): id: int title: str salary: Optional[str] description: Optional[str] contact: Optional[str] created_at: str is_active: bool views: int tags: List[Tag] = [] company_name: Optional[str] = None class WorkExperience(BaseModel): position: str company: str period: Optional[str] = None class Education(BaseModel): institution: str specialty: Optional[str] = None graduation_year: Optional[str] = None class ResumeCreate(BaseModel): desired_position: Optional[str] = None about_me: Optional[str] = None desired_salary: Optional[str] = None work_experience: List[WorkExperience] = [] education: List[Education] = [] tags: List[str] = [] # список названий тегов class ResumeResponse(BaseModel): id: int user_id: int desired_position: Optional[str] about_me: Optional[str] desired_salary: Optional[str] work_experience: List[WorkExperience] education: List[Education] tags: List[Tag] = [] views: int updated_at: str full_name: Optional[str] = None class CompanyCreate(BaseModel): name: str description: Optional[str] = None website: Optional[str] = None logo: Optional[str] = None address: Optional[str] = None phone: Optional[str] = None email: Optional[str] = None class CompanyResponse(BaseModel): id: int user_id: int name: str description: Optional[str] website: Optional[str] logo: Optional[str] address: Optional[str] phone: Optional[str] email: Optional[str] created_at: str updated_at: str class FavoriteCreate(BaseModel): item_type: str # 'vacancy' или 'resume' item_id: int class FavoriteResponse(BaseModel): id: int user_id: int item_type: str item_id: int created_at: str # Для расширенной информации item_data: Optional[dict] = None class FavoriteItem(BaseModel): id: int type: str title: str subtitle: str salary: Optional[str] = None company: Optional[str] = None name: Optional[str] = None tags: List[str] = [] created_at: str url: str class ApplicationCreate(BaseModel): vacancy_id: int message: Optional[str] = None class ApplicationResponse(BaseModel): id: int vacancy_id: int user_id: int message: Optional[str] status: str created_at: str viewed_at: Optional[str] response_message: Optional[str] response_at: Optional[str] # Данные о вакансии vacancy_title: Optional[str] = None company_name: Optional[str] = None # Данные о соискателе user_name: Optional[str] = None user_email: Optional[str] = None user_phone: Optional[str] = None user_telegram: Optional[str] = None class ApplicationStatusUpdate(BaseModel): status: str # 'viewed', 'accepted', 'rejected' response_message: Optional[str] = None class ApplicationStats(BaseModel): total: int pending: int viewed: int accepted: int rejected: int # ========== ВСПОМОГАТЕЛЬНЫЕ ФУНКЦИИ ========== def hash_password(password: str) -> str: """Хеширование пароля""" salt = "rabota.today.salt" return hashlib.sha256((password + salt).encode()).hexdigest() def verify_password(password: str, hash: str) -> bool: """Проверка пароля""" return hash_password(password) == hash def create_access_token(data: dict): """Создание JWT токена""" to_encode = data.copy() expire = datetime.utcnow() + timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES) to_encode.update({"exp": expire}) return jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM) def get_current_user(credentials: HTTPAuthorizationCredentials = Depends(security)): """Получение текущего пользователя из токена""" token = credentials.credentials try: payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM]) user_id = payload.get("sub") if user_id is None: raise HTTPException(status_code=401, detail="Невалидный токен") return int(user_id) except jwt.PyJWTError: raise HTTPException(status_code=401, detail="Невалидный токен") def get_current_user_optional(token: Optional[str] = None): """Опциональное получение пользователя (для публичных страниц)""" if not token: return None try: payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM]) return int(payload.get("sub")) except: return None def get_tags_for_vacancy(cursor, vacancy_id): """Получение тегов для вакансии""" cursor.execute(""" SELECT t.* FROM tags t JOIN vacancy_tags vt ON t.id = vt.tag_id WHERE vt.vacancy_id = ? """, (vacancy_id,)) return [dict(tag) for tag in cursor.fetchall()] def get_tags_for_resume(cursor, resume_id): """Получение тегов для резюме""" cursor.execute(""" SELECT t.* FROM tags t JOIN resume_tags rt ON t.id = rt.tag_id WHERE rt.resume_id = ? """, (resume_id,)) return [dict(tag) for tag in cursor.fetchall()] def add_tags_to_vacancy(cursor, vacancy_id, tag_names): """Добавление тегов к вакансии""" for tag_name in tag_names: # Ищем или создаем тег cursor.execute("SELECT id FROM tags WHERE name = ?", (tag_name,)) tag = cursor.fetchone() if tag: tag_id = tag["id"] else: cursor.execute("INSERT INTO tags (name) VALUES (?)", (tag_name,)) tag_id = cursor.lastrowid # Связываем тег с вакансией cursor.execute(""" INSERT OR IGNORE INTO vacancy_tags (vacancy_id, tag_id) VALUES (?, ?) """, (vacancy_id, tag_id)) def add_tags_to_resume(cursor, resume_id, tag_names): """Добавление тегов к резюме""" for tag_name in tag_names: cursor.execute("SELECT id FROM tags WHERE name = ?", (tag_name,)) tag = cursor.fetchone() if tag: tag_id = tag["id"] else: cursor.execute("INSERT INTO tags (name) VALUES (?)", (tag_name,)) tag_id = cursor.lastrowid cursor.execute(""" INSERT OR IGNORE INTO resume_tags (resume_id, tag_id) VALUES (?, ?) """, (resume_id, tag_id)) # ========== HTML СТРАНИЦЫ ========== @app.get("/", response_class=HTMLResponse) async def get_index(): """Главная страница""" file_path = TEMPLATES_DIR / "index.html" if not file_path.exists(): return HTMLResponse( content="

Rabota.Today API

Сервер работает. Создайте файл templates/index.html

") return FileResponse(file_path) @app.get("/login", response_class=HTMLResponse) async def get_login(): """Страница входа""" file_path = TEMPLATES_DIR / "login.html" return FileResponse(file_path) if file_path.exists() else HTMLResponse( content="

Страница входа

Создайте файл templates/login.html

") @app.get("/register", response_class=HTMLResponse) async def get_register_page(): """Отдельная страница регистрации""" file_path = TEMPLATES_DIR / "register.html" if not file_path.exists(): return HTMLResponse(content="

Страница регистрации

Создайте файл templates/register.html

") return FileResponse(file_path) @app.get("/register", response_class=HTMLResponse) async def get_register(): """Страница регистрации""" file_path = TEMPLATES_DIR / "register.html" return FileResponse(file_path) if file_path.exists() else HTMLResponse( content="

Страница регистрации

Создайте файл templates/register.html

") @app.get("/profile", response_class=HTMLResponse) async def get_profile(): """Страница профиля""" file_path = TEMPLATES_DIR / "profile.html" return FileResponse(file_path) if file_path.exists() else HTMLResponse( content="

Профиль

Создайте файл templates/profile.html

") @app.get("/vacancies", response_class=HTMLResponse) async def get_vacancies(): """Страница со списком вакансий""" file_path = TEMPLATES_DIR / "vacancies.html" return FileResponse(file_path) if file_path.exists() else HTMLResponse( content="

Вакансии

Создайте файл templates/vacancies.html

") @app.get("/vacancy/{vacancy_id}", response_class=HTMLResponse) async def get_vacancy_detail(vacancy_id: int): """Страница конкретной вакансии""" file_path = TEMPLATES_DIR / "vacancy_detail.html" return FileResponse(file_path) if file_path.exists() else HTMLResponse( content=f"

Вакансия #{vacancy_id}

Создайте файл templates/vacancy_detail.html

") @app.get("/resumes", response_class=HTMLResponse) async def get_resumes(): """Страница со списком резюме""" file_path = TEMPLATES_DIR / "resumes.html" return FileResponse(file_path) if file_path.exists() else HTMLResponse( content="

Резюме

Создайте файл templates/resumes.html

") @app.get("/resume/{resume_id}", response_class=HTMLResponse) async def get_resume_detail(resume_id: int): """Страница конкретного резюме""" file_path = TEMPLATES_DIR / "resume_detail.html" return FileResponse(file_path) if file_path.exists() else HTMLResponse( content=f"

Резюме #{resume_id}

Создайте файл templates/resume_detail.html

") @app.get("/favorites", response_class=HTMLResponse) async def get_favorites_page(): """Страница избранного""" file_path = TEMPLATES_DIR / "favorites.html" return FileResponse(file_path) if file_path.exists() else HTMLResponse(content="

Избранное

Создайте файл templates/favorites.html

") @app.get("/applications", response_class=HTMLResponse) async def get_applications_page(): """Страница со списком откликов""" file_path = TEMPLATES_DIR / "applications.html" return FileResponse(file_path) if file_path.exists() else HTMLResponse(content="

Отклики

Создайте файл templates/applications.html

") @app.get("/application/{application_id}", response_class=HTMLResponse) async def get_application_page(application_id: int): """Детальная страница отклика""" file_path = TEMPLATES_DIR / "application_detail.html" return FileResponse(file_path) if file_path.exists() else HTMLResponse(content=f"

Отклик #{application_id}

Создайте файл templates/application_detail.html

") @app.get("/admin", response_class=HTMLResponse) async def get_admin(): """Страница админки""" file_path = TEMPLATES_DIR / "admin.html" return FileResponse(file_path) if file_path.exists() else HTMLResponse( content="

Админка

Создайте файл templates/admin.html

") # ========== API ЭНДПОИНТЫ ========== @app.post("/api/register", response_model=TokenResponse) async def register(user: UserRegister): """Регистрация нового пользователя""" with get_db() as conn: cursor = conn.cursor() cursor.execute("SELECT id FROM users WHERE email = ?", (user.email,)) if cursor.fetchone(): raise HTTPException(status_code=400, detail="Email уже зарегистрирован") password_hash = hash_password(user.password) is_admin = 1 if user.email == "admin@rabota.today" else 0 cursor.execute(""" INSERT INTO users (full_name, email, phone, telegram, password_hash, role, is_admin) VALUES (?, ?, ?, ?, ?, ?, ?) """, (user.full_name, user.email, user.phone, user.telegram, password_hash, user.role, is_admin)) user_id = cursor.lastrowid conn.commit() token = create_access_token({"sub": str(user_id), "role": user.role, "is_admin": is_admin}) return { "access_token": token, "token_type": "bearer", "user_id": user_id, "full_name": user.full_name, "role": user.role, "is_admin": bool(is_admin) } @app.post("/api/login", response_model=TokenResponse) async def login(user_data: UserLogin): """Вход в систему""" with get_db() as conn: cursor = conn.cursor() cursor.execute("SELECT * FROM users WHERE email = ?", (user_data.email,)) user = cursor.fetchone() if not user or not verify_password(user_data.password, user["password_hash"]): raise HTTPException(status_code=401, detail="Неверный email или пароль") token = create_access_token({"sub": str(user["id"]), "role": user["role"], "is_admin": user["is_admin"]}) return { "access_token": token, "token_type": "bearer", "user_id": user["id"], "full_name": user["full_name"], "role": user["role"], "is_admin": bool(user["is_admin"]) } @app.get("/api/tags") async def get_all_tags(category: Optional[str] = None): """Получение всех тегов""" with get_db() as conn: cursor = conn.cursor() if category: cursor.execute("SELECT * FROM tags WHERE category = ? ORDER BY name", (category,)) else: cursor.execute("SELECT * FROM tags ORDER BY category, name") tags = cursor.fetchall() return [dict(tag) for tag in tags] @app.post("/api/tags") async def create_tag(tag: Tag, user_id: int = Depends(get_current_user)): """Создание нового тега (только для админа)""" with get_db() as conn: cursor = conn.cursor() # Проверяем, админ ли пользователь cursor.execute("SELECT is_admin FROM users WHERE id = ?", (user_id,)) user = cursor.fetchone() if not user or not user["is_admin"]: raise HTTPException(status_code=403, detail="Только администратор может создавать теги") try: cursor.execute("INSERT INTO tags (name, category) VALUES (?, ?)", (tag.name, tag.category)) conn.commit() tag_id = cursor.lastrowid cursor.execute("SELECT * FROM tags WHERE id = ?", (tag_id,)) return dict(cursor.fetchone()) except sqlite3.IntegrityError: raise HTTPException(status_code=400, detail="Тег с таким названием уже существует") # ========== ЭНДПОИНТЫ ДЛЯ ВАКАНСИЙ ========== @app.post("/api/vacancies", response_model=VacancyResponse) async def create_vacancy( vacancy: VacancyCreate, user_id: int = Depends(get_current_user) ): """Создание новой вакансии""" with get_db() as conn: cursor = conn.cursor() cursor.execute("SELECT role FROM users WHERE id = ?", (user_id,)) user = cursor.fetchone() if not user or user["role"] != "employer": raise HTTPException(status_code=403, detail="Только работодатели могут создавать вакансии") cursor.execute(""" INSERT INTO vacancies (user_id, title, salary, description, contact) VALUES (?, ?, ?, ?, ?) """, (user_id, vacancy.title, vacancy.salary, vacancy.description, vacancy.contact)) vacancy_id = cursor.lastrowid # Добавляем теги if vacancy.tags: add_tags_to_vacancy(cursor, vacancy_id, vacancy.tags) conn.commit() cursor.execute("SELECT * FROM vacancies WHERE id = ?", (vacancy_id,)) new_vacancy = dict(cursor.fetchone()) new_vacancy["tags"] = get_tags_for_vacancy(cursor, vacancy_id) return new_vacancy @app.get("/api/vacancies", response_model=List[VacancyResponse]) async def get_my_vacancies(user_id: int = Depends(get_current_user)): """Получение всех вакансий текущего пользователя с данными компании""" with get_db() as conn: cursor = conn.cursor() # Получаем компанию пользователя cursor.execute("SELECT name FROM companies WHERE user_id = ?", (user_id,)) company = cursor.fetchone() company_name = company["name"] if company else None cursor.execute(""" SELECT * FROM vacancies WHERE user_id = ? AND is_active = 1 ORDER BY created_at DESC """, (user_id,)) vacancies = cursor.fetchall() result = [] for v in vacancies: vacancy_dict = dict(v) # Добавляем название компании (либо из таблицы companies, либо имя пользователя) if company_name: vacancy_dict["company_name"] = company_name else: cursor.execute("SELECT full_name FROM users WHERE id = ?", (user_id,)) user = cursor.fetchone() vacancy_dict["company_name"] = user["full_name"] if user else None # Добавляем теги cursor.execute(""" SELECT t.* FROM tags t JOIN vacancy_tags vt ON t.id = vt.tag_id WHERE vt.vacancy_id = ? """, (v["id"],)) vacancy_dict["tags"] = [dict(tag) for tag in cursor.fetchall()] result.append(vacancy_dict) return result @app.get("/api/vacancies/all") async def get_all_vacancies( page: int = 1, search: str = None, tags: str = None, min_salary: int = None, sort: str = "newest" ): """Получение всех вакансий с фильтрацией по тегам и данными компаний""" with get_db() as conn: cursor = conn.cursor() query = """ SELECT DISTINCT v.*, COALESCE(c.name, u.full_name) as company_name FROM vacancies v JOIN users u ON v.user_id = u.id LEFT JOIN companies c ON v.user_id = c.user_id WHERE v.is_active = 1 """ params = [] if search: query += " AND (v.title LIKE ? OR v.description LIKE ?)" params.extend([f"%{search}%", f"%{search}%"]) if tags: tag_list = [t.strip() for t in tags.split(',')] placeholders = ','.join(['?'] * len(tag_list)) query += f""" AND v.id IN ( SELECT vt.vacancy_id FROM vacancy_tags vt JOIN tags t ON vt.tag_id = t.id WHERE t.name IN ({placeholders}) GROUP BY vt.vacancy_id HAVING COUNT(DISTINCT t.name) = ? ) """ params.extend(tag_list + [len(tag_list)]) if min_salary: query += " AND v.salary LIKE ?" params.append(f"%{min_salary}%") if sort == "salary_desc": query += " ORDER BY v.salary DESC" elif sort == "salary_asc": query += " ORDER BY v.salary ASC" else: query += " ORDER BY v.created_at DESC" # Пагинация limit = 10 offset = (page - 1) * limit query += " LIMIT ? OFFSET ?" params.extend([limit, offset]) cursor.execute(query, params) vacancies = cursor.fetchall() result = [] for v in vacancies: vacancy_dict = dict(v) # Добавляем теги для каждой вакансии cursor.execute(""" SELECT t.* FROM tags t JOIN vacancy_tags vt ON t.id = vt.tag_id WHERE vt.vacancy_id = ? """, (v["id"],)) vacancy_dict["tags"] = [dict(tag) for tag in cursor.fetchall()] result.append(vacancy_dict) # Получаем общее количество cursor.execute("SELECT COUNT(*) FROM vacancies WHERE is_active = 1") total = cursor.fetchone()[0] return { "vacancies": result, "total_pages": (total + limit - 1) // limit, "current_page": page } @app.get("/api/vacancies/{vacancy_id}") async def get_vacancy(vacancy_id: int, token: Optional[str] = None): """Получение конкретной вакансии с данными компании""" user_id = get_current_user_optional(token) with get_db() as conn: cursor = conn.cursor() # Увеличиваем счетчик просмотров cursor.execute("UPDATE vacancies SET views = views + 1 WHERE id = ?", (vacancy_id,)) # Получаем вакансию с данными компании cursor.execute(""" SELECT v.*, COALESCE(c.name, u.full_name) as company_name, c.description as company_description, c.website as company_website, c.logo as company_logo, c.address as company_address, c.phone as company_phone, c.email as company_email, u.email as user_email, u.telegram as user_telegram FROM vacancies v JOIN users u ON v.user_id = u.id LEFT JOIN companies c ON v.user_id = c.user_id WHERE v.id = ? """, (vacancy_id,)) vacancy = cursor.fetchone() if not vacancy: raise HTTPException(status_code=404, detail="Вакансия не найдена") result = dict(vacancy) # Добавляем теги cursor.execute(""" SELECT t.* FROM tags t JOIN vacancy_tags vt ON t.id = vt.tag_id WHERE vt.vacancy_id = ? """, (vacancy_id,)) result["tags"] = [dict(tag) for tag in cursor.fetchall()] # Проверяем, откликался ли пользователь if user_id: cursor.execute(""" SELECT id FROM applications WHERE vacancy_id = ? AND user_id = ? """, (vacancy_id, user_id)) result["has_applied"] = cursor.fetchone() is not None conn.commit() return result @app.post("/api/vacancies/{vacancy_id}/apply") async def apply_to_vacancy( vacancy_id: int, user_id: int = Depends(get_current_user) ): """Отклик на вакансию""" with get_db() as conn: cursor = conn.cursor() cursor.execute("SELECT id FROM vacancies WHERE id = ?", (vacancy_id,)) if not cursor.fetchone(): raise HTTPException(status_code=404, detail="Вакансия не найдена") cursor.execute(""" SELECT id FROM applications WHERE vacancy_id = ? AND user_id = ? """, (vacancy_id, user_id)) if cursor.fetchone(): raise HTTPException(status_code=400, detail="Вы уже откликались на эту вакансию") cursor.execute(""" INSERT INTO applications (vacancy_id, user_id) VALUES (?, ?) """, (vacancy_id, user_id)) conn.commit() return {"message": "Отклик отправлен"} # ========== ЭНДПОИНТЫ ДЛЯ РЕЗЮМЕ ========== @app.post("/api/resume", response_model=ResumeResponse) async def create_or_update_resume( resume: ResumeCreate, user_id: int = Depends(get_current_user) ): """Создание или обновление резюме""" with get_db() as conn: cursor = conn.cursor() cursor.execute("SELECT role FROM users WHERE id = ?", (user_id,)) user = cursor.fetchone() if not user or user["role"] != "employee": raise HTTPException(status_code=403, detail="Только соискатели могут создавать резюме") cursor.execute("SELECT id FROM resumes WHERE user_id = ?", (user_id,)) existing = cursor.fetchone() if existing: cursor.execute(""" UPDATE resumes SET desired_position = ?, about_me = ?, desired_salary = ?, updated_at = CURRENT_TIMESTAMP WHERE user_id = ? """, (resume.desired_position, resume.about_me, resume.desired_salary, user_id)) resume_id = existing["id"] cursor.execute("DELETE FROM work_experience WHERE resume_id = ?", (resume_id,)) cursor.execute("DELETE FROM education WHERE resume_id = ?", (resume_id,)) cursor.execute("DELETE FROM resume_tags WHERE resume_id = ?", (resume_id,)) else: cursor.execute(""" INSERT INTO resumes (user_id, desired_position, about_me, desired_salary) VALUES (?, ?, ?, ?) """, (user_id, resume.desired_position, resume.about_me, resume.desired_salary)) resume_id = cursor.lastrowid for exp in resume.work_experience: cursor.execute(""" INSERT INTO work_experience (resume_id, position, company, period) VALUES (?, ?, ?, ?) """, (resume_id, exp.position, exp.company, exp.period)) for edu in resume.education: cursor.execute(""" INSERT INTO education (resume_id, institution, specialty, graduation_year) VALUES (?, ?, ?, ?) """, (resume_id, edu.institution, edu.specialty, edu.graduation_year)) # Добавляем теги if resume.tags: add_tags_to_resume(cursor, resume_id, resume.tags) conn.commit() cursor.execute("SELECT * FROM resumes WHERE id = ?", (resume_id,)) resume_data = dict(cursor.fetchone()) cursor.execute("SELECT position, company, period FROM work_experience WHERE resume_id = ?", (resume_id,)) resume_data["work_experience"] = [dict(exp) for exp in cursor.fetchall()] cursor.execute("SELECT institution, specialty, graduation_year FROM education WHERE resume_id = ?", (resume_id,)) resume_data["education"] = [dict(edu) for edu in cursor.fetchall()] resume_data["tags"] = get_tags_for_resume(cursor, resume_id) return resume_data @app.get("/api/resume", response_model=ResumeResponse) async def get_my_resume(user_id: int = Depends(get_current_user)): """Получение резюме текущего пользователя""" with get_db() as conn: cursor = conn.cursor() cursor.execute("SELECT * FROM resumes WHERE user_id = ?", (user_id,)) resume = cursor.fetchone() if not resume: raise HTTPException(status_code=404, detail="Резюме не найдено") resume_data = dict(resume) cursor.execute("SELECT position, company, period FROM work_experience WHERE resume_id = ?", (resume["id"],)) resume_data["work_experience"] = [dict(exp) for exp in cursor.fetchall()] cursor.execute("SELECT institution, specialty, graduation_year FROM education WHERE resume_id = ?", (resume["id"],)) resume_data["education"] = [dict(edu) for edu in cursor.fetchall()] resume_data["tags"] = get_tags_for_resume(cursor, resume["id"]) return resume_data @app.get("/api/resumes/all") async def get_all_resumes( page: int = 1, search: str = None, tags: str = None, min_salary: int = None ): """Получение всех резюме с фильтрацией по тегам""" with get_db() as conn: cursor = conn.cursor() query = """ SELECT DISTINCT r.*, u.full_name FROM resumes r JOIN users u ON r.user_id = u.id WHERE 1=1 """ params = [] if search: query += " AND (r.desired_position LIKE ? OR r.about_me LIKE ?)" params.extend([f"%{search}%", f"%{search}%"]) if tags: tag_list = [t.strip() for t in tags.split(',')] placeholders = ','.join(['?'] * len(tag_list)) query += f""" AND r.id IN ( SELECT rt.resume_id FROM resume_tags rt JOIN tags t ON rt.tag_id = t.id WHERE t.name IN ({placeholders}) GROUP BY rt.resume_id HAVING COUNT(DISTINCT t.name) = ? ) """ params.extend(tag_list + [len(tag_list)]) if min_salary: query += " AND r.desired_salary LIKE ?" params.append(f"%{min_salary}%") # Пагинация limit = 10 offset = (page - 1) * limit query += " ORDER BY r.updated_at DESC LIMIT ? OFFSET ?" params.extend([limit, offset]) cursor.execute(query, params) resumes = cursor.fetchall() result = [] for r in resumes: resume_dict = dict(r) resume_dict["tags"] = get_tags_for_resume(cursor, r["id"]) cursor.execute("SELECT COUNT(*) FROM work_experience WHERE resume_id = ?", (r["id"],)) resume_dict["experience_count"] = cursor.fetchone()[0] result.append(resume_dict) cursor.execute("SELECT COUNT(*) FROM resumes") total = cursor.fetchone()[0] return { "resumes": result, "total_pages": (total + limit - 1) // limit, "current_page": page } @app.get("/api/resumes/{resume_id}") async def get_resume(resume_id: int): """Получение конкретного резюме""" with get_db() as conn: cursor = conn.cursor() cursor.execute(""" SELECT r.*, u.full_name, u.email, u.phone, u.telegram FROM resumes r JOIN users u ON r.user_id = u.id WHERE r.id = ? """, (resume_id,)) resume = cursor.fetchone() if not resume: raise HTTPException(status_code=404, detail="Резюме не найдено") # Увеличиваем счетчик просмотров cursor.execute("UPDATE resumes SET views = views + 1 WHERE id = ?", (resume_id,)) resume_data = dict(resume) cursor.execute("SELECT position, company, period FROM work_experience WHERE resume_id = ?", (resume_id,)) resume_data["work_experience"] = [dict(exp) for exp in cursor.fetchall()] cursor.execute("SELECT institution, specialty, graduation_year FROM education WHERE resume_id = ?", (resume_id,)) resume_data["education"] = [dict(edu) for edu in cursor.fetchall()] resume_data["tags"] = get_tags_for_resume(cursor, resume_id) conn.commit() return resume_data @app.get("/api/company", response_model=CompanyResponse) async def get_my_company(user_id: int = Depends(get_current_user)): """Получение информации о компании текущего пользователя""" with get_db() as conn: cursor = conn.cursor() # Проверяем, что пользователь - работодатель cursor.execute("SELECT role FROM users WHERE id = ?", (user_id,)) user = cursor.fetchone() if not user or user["role"] != "employer": raise HTTPException(status_code=403, detail="Только работодатели могут иметь компанию") cursor.execute("SELECT * FROM companies WHERE user_id = ?", (user_id,)) company = cursor.fetchone() if not company: # Если компании нет, возвращаем пустой объект с 404 raise HTTPException(status_code=404, detail="Компания не найдена") return dict(company) @app.post("/api/company", response_model=CompanyResponse) async def create_or_update_company( company: CompanyCreate, user_id: int = Depends(get_current_user) ): """Создание или обновление информации о компании""" with get_db() as conn: cursor = conn.cursor() # Проверяем, что пользователь - работодатель cursor.execute("SELECT role FROM users WHERE id = ?", (user_id,)) user = cursor.fetchone() if not user or user["role"] != "employer": raise HTTPException(status_code=403, detail="Только работодатели могут создавать компанию") # Проверяем существование компании cursor.execute("SELECT id FROM companies WHERE user_id = ?", (user_id,)) existing = cursor.fetchone() if existing: # Обновление существующей компании cursor.execute(""" UPDATE companies SET name = ?, description = ?, website = ?, logo = ?, address = ?, phone = ?, email = ?, updated_at = CURRENT_TIMESTAMP WHERE user_id = ? """, (company.name, company.description, company.website, company.logo, company.address, company.phone, company.email, user_id)) company_id = existing["id"] else: # Создание новой компании cursor.execute(""" INSERT INTO companies (user_id, name, description, website, logo, address, phone, email) VALUES (?, ?, ?, ?, ?, ?, ?, ?) """, (user_id, company.name, company.description, company.website, company.logo, company.address, company.phone, company.email)) company_id = cursor.lastrowid conn.commit() cursor.execute("SELECT * FROM companies WHERE id = ?", (company_id,)) return dict(cursor.fetchone()) @app.get("/api/companies/{company_id}") async def get_company_by_id(company_id: int): """Получение информации о компании по ID (публичный эндпоинт)""" with get_db() as conn: cursor = conn.cursor() cursor.execute(""" SELECT c.*, u.full_name as owner_name FROM companies c JOIN users u ON c.user_id = u.id WHERE c.id = ? """, (company_id,)) company = cursor.fetchone() if not company: raise HTTPException(status_code=404, detail="Компания не найдена") return dict(company) @app.get("/api/resumes/{resume_id}") async def get_resume(resume_id: int): """Получение конкретного резюме""" with get_db() as conn: cursor = conn.cursor() # Увеличиваем счетчик просмотров cursor.execute("UPDATE resumes SET views = views + 1 WHERE id = ?", (resume_id,)) cursor.execute(""" SELECT r.*, u.full_name, u.email, u.phone, u.telegram FROM resumes r JOIN users u ON r.user_id = u.id WHERE r.id = ? """, (resume_id,)) resume = cursor.fetchone() if not resume: raise HTTPException(status_code=404, detail="Резюме не найдено") resume_data = dict(resume) # Получаем опыт работы cursor.execute("SELECT position, company, period FROM work_experience WHERE resume_id = ?", (resume_id,)) resume_data["work_experience"] = [dict(exp) for exp in cursor.fetchall()] # Получаем образование cursor.execute("SELECT institution, specialty, graduation_year FROM education WHERE resume_id = ?", (resume_id,)) resume_data["education"] = [dict(edu) for edu in cursor.fetchall()] # Получаем теги cursor.execute(""" SELECT t.* FROM tags t JOIN resume_tags rt ON t.id = rt.tag_id WHERE rt.resume_id = ? """, (resume_id,)) resume_data["tags"] = [dict(tag) for tag in cursor.fetchall()] conn.commit() return resume_data # ========== ЭНДПОИНТЫ ДЛЯ ИЗБРАННОГО ========== @app.post("/api/favorites") async def add_to_favorites( favorite: FavoriteCreate, user_id: int = Depends(get_current_user) ): """Добавление в избранное""" with get_db() as conn: cursor = conn.cursor() # Проверяем существование элемента if favorite.item_type == 'vacancy': cursor.execute("SELECT id FROM vacancies WHERE id = ? AND is_active = 1", (favorite.item_id,)) else: # resume cursor.execute("SELECT id FROM resumes WHERE id = ?", (favorite.item_id,)) if not cursor.fetchone(): raise HTTPException(status_code=404, detail=f"{favorite.item_type} не найден") # Добавляем в избранное try: cursor.execute(""" INSERT INTO favorites (user_id, item_type, item_id) VALUES (?, ?, ?) """, (user_id, favorite.item_type, favorite.item_id)) conn.commit() return {"message": "Добавлено в избранное"} except sqlite3.IntegrityError: raise HTTPException(status_code=400, detail="Уже в избранном") @app.delete("/api/favorites") async def remove_from_favorites( favorite: FavoriteCreate, user_id: int = Depends(get_current_user) ): """Удаление из избранного""" with get_db() as conn: cursor = conn.cursor() cursor.execute(""" DELETE FROM favorites WHERE user_id = ? AND item_type = ? AND item_id = ? """, (user_id, favorite.item_type, favorite.item_id)) if cursor.rowcount == 0: raise HTTPException(status_code=404, detail="Не найдено в избранном") conn.commit() return {"message": "Удалено из избранного"} @app.get("/api/favorites") async def get_favorites( user_id: int = Depends(get_current_user), item_type: Optional[str] = None ): """Получение списка избранного""" with get_db() as conn: cursor = conn.cursor() query = "SELECT * FROM favorites WHERE user_id = ?" params = [user_id] if item_type: query += " AND item_type = ?" params.append(item_type) query += " ORDER BY created_at DESC" cursor.execute(query, params) favorites = cursor.fetchall() result = [] for fav in favorites: fav_dict = dict(fav) # Получаем дополнительные данные о элементе if fav["item_type"] == 'vacancy': cursor.execute(""" SELECT v.*, COALESCE(c.name, u.full_name) as company_name FROM vacancies v JOIN users u ON v.user_id = u.id LEFT JOIN companies c ON v.user_id = c.user_id WHERE v.id = ? """, (fav["item_id"],)) item = cursor.fetchone() if item: item_dict = dict(item) # Получаем теги вакансии cursor.execute(""" SELECT t.name FROM tags t JOIN vacancy_tags vt ON t.id = vt.tag_id WHERE vt.vacancy_id = ? """, (fav["item_id"],)) tags = [t["name"] for t in cursor.fetchall()] fav_dict["item_data"] = { "id": item["id"], "title": item["title"], "company": item_dict.get("company_name"), "salary": item["salary"], "tags": tags, "url": f"/vacancy/{item['id']}" } else: # resume cursor.execute(""" SELECT r.*, u.full_name FROM resumes r JOIN users u ON r.user_id = u.id WHERE r.id = ? """, (fav["item_id"],)) item = cursor.fetchone() if item: item_dict = dict(item) # Получаем теги резюме cursor.execute(""" SELECT t.name FROM tags t JOIN resume_tags rt ON t.id = rt.tag_id WHERE rt.resume_id = ? """, (fav["item_id"],)) tags = [t["name"] for t in cursor.fetchall()] fav_dict["item_data"] = { "id": item["id"], "title": item_dict.get("desired_position", "Резюме"), "name": item["full_name"], "salary": item["desired_salary"], "tags": tags, "url": f"/resume/{item['id']}" } result.append(fav_dict) return result @app.get("/api/favorites/check/{item_type}/{item_id}") async def check_favorite( item_type: str, item_id: int, user_id: int = Depends(get_current_user) ): """Проверка, находится ли элемент в избранном""" with get_db() as conn: cursor = conn.cursor() cursor.execute(""" SELECT id FROM favorites WHERE user_id = ? AND item_type = ? AND item_id = ? """, (user_id, item_type, item_id)) return {"is_favorite": cursor.fetchone() is not None} @app.get("/api/user/stats") async def get_user_stats(user_id: int = Depends(get_current_user)): """Получение статистики пользователя (количество избранного, просмотров и т.д.)""" with get_db() as conn: cursor = conn.cursor() stats = {} # Количество избранных вакансий cursor.execute(""" SELECT COUNT(*) FROM favorites WHERE user_id = ? AND item_type = 'vacancy' """, (user_id,)) stats["favorite_vacancies"] = cursor.fetchone()[0] # Количество избранных резюме cursor.execute(""" SELECT COUNT(*) FROM favorites WHERE user_id = ? AND item_type = 'resume' """, (user_id,)) stats["favorite_resumes"] = cursor.fetchone()[0] # Общее количество избранного stats["total_favorites"] = stats["favorite_vacancies"] + stats["favorite_resumes"] # Количество просмотров профиля (если есть резюме) cursor.execute(""" SELECT SUM(views) FROM resumes WHERE user_id = ? """, (user_id,)) stats["profile_views"] = cursor.fetchone()[0] or 0 # Количество созданных вакансий (для работодателя) cursor.execute(""" SELECT COUNT(*) FROM vacancies WHERE user_id = ? AND is_active = 1 """, (user_id,)) stats["active_vacancies"] = cursor.fetchone()[0] return stats # ========== ЭНДПОИНТЫ ДЛЯ ОТКЛИКОВ ========== @app.post("/api/applications") async def create_application( application: ApplicationCreate, user_id: int = Depends(get_current_user) ): """Создание отклика на вакансию""" with get_db() as conn: cursor = conn.cursor() # Проверяем, что пользователь - соискатель cursor.execute("SELECT role FROM users WHERE id = ?", (user_id,)) user = cursor.fetchone() if not user or user["role"] != "employee": raise HTTPException(status_code=403, detail="Только соискатели могут откликаться на вакансии") # Проверяем существование вакансии cursor.execute(""" SELECT v.*, u.id as employer_id FROM vacancies v JOIN users u ON v.user_id = u.id WHERE v.id = ? AND v.is_active = 1 """, (application.vacancy_id,)) vacancy = cursor.fetchone() if not vacancy: raise HTTPException(status_code=404, detail="Вакансия не найдена") # Проверяем, не откликался ли уже cursor.execute(""" SELECT id FROM applications WHERE vacancy_id = ? AND user_id = ? """, (application.vacancy_id, user_id)) if cursor.fetchone(): raise HTTPException(status_code=400, detail="Вы уже откликались на эту вакансию") # Создаем отклик cursor.execute(""" INSERT INTO applications (vacancy_id, user_id, message) VALUES (?, ?, ?) """, (application.vacancy_id, user_id, application.message)) application_id = cursor.lastrowid conn.commit() # Получаем созданный отклик cursor.execute("SELECT * FROM applications WHERE id = ?", (application_id,)) new_application = dict(cursor.fetchone()) return new_application @app.get("/api/applications/received") async def get_received_applications( user_id: int = Depends(get_current_user), status: Optional[str] = None, page: int = 1, limit: int = 20 ): """Получение откликов на вакансии работодателя (входящие)""" with get_db() as conn: cursor = conn.cursor() # Проверяем, что пользователь - работодатель cursor.execute("SELECT role FROM users WHERE id = ?", (user_id,)) user = cursor.fetchone() if not user or user["role"] != "employer": raise HTTPException(status_code=403, detail="Только работодатели могут просматривать отклики") query = """ SELECT a.*, v.title as vacancy_title, v.salary as vacancy_salary, u.full_name as user_name, u.email as user_email, u.phone as user_phone, u.telegram as user_telegram FROM applications a JOIN vacancies v ON a.vacancy_id = v.id JOIN users u ON a.user_id = u.id WHERE v.user_id = ? """ params = [user_id] if status: query += " AND a.status = ?" params.append(status) # Пагинация offset = (page - 1) * limit query += " ORDER BY a.created_at DESC LIMIT ? OFFSET ?" params.extend([limit, offset]) cursor.execute(query, params) applications = cursor.fetchall() # Получаем общее количество count_query = "SELECT COUNT(*) FROM applications a JOIN vacancies v ON a.vacancy_id = v.id WHERE v.user_id = ?" count_params = [user_id] if status: count_query += " AND a.status = ?" count_params.append(status) cursor.execute(count_query, count_params) total = cursor.fetchone()[0] result = [] for app in applications: app_dict = dict(app) result.append(app_dict) return { "applications": result, "total": total, "page": page, "total_pages": (total + limit - 1) // limit } @app.get("/api/applications/sent") async def get_sent_applications( user_id: int = Depends(get_current_user), status: Optional[str] = None, page: int = 1, limit: int = 20 ): """Получение отправленных откликов (для соискателя)""" with get_db() as conn: cursor = conn.cursor() # Проверяем, что пользователь - соискатель cursor.execute("SELECT role FROM users WHERE id = ?", (user_id,)) user = cursor.fetchone() if not user or user["role"] != "employee": raise HTTPException(status_code=403, detail="Только соискатели могут просматривать свои отклики") query = """ SELECT a.*, v.title as vacancy_title, v.salary as vacancy_salary, COALESCE(c.name, u_emp.full_name) as company_name, u_emp.email as employer_email, u_emp.telegram as employer_telegram FROM applications a JOIN vacancies v ON a.vacancy_id = v.id JOIN users u_emp ON v.user_id = u_emp.id LEFT JOIN companies c ON u_emp.id = c.user_id WHERE a.user_id = ? """ params = [user_id] if status: query += " AND a.status = ?" params.append(status) # Пагинация offset = (page - 1) * limit query += " ORDER BY a.created_at DESC LIMIT ? OFFSET ?" params.extend([limit, offset]) cursor.execute(query, params) applications = cursor.fetchall() # Получаем общее количество cursor.execute("SELECT COUNT(*) FROM applications WHERE user_id = ?", (user_id,)) total = cursor.fetchone()[0] result = [] for app in applications: app_dict = dict(app) result.append(app_dict) return { "applications": result, "total": total, "page": page, "total_pages": (total + limit - 1) // limit } @app.get("/api/applications/{application_id}") async def get_application( application_id: int, user_id: int = Depends(get_current_user) ): """Получение детальной информации об отклике""" with get_db() as conn: cursor = conn.cursor() # Получаем отклик с данными cursor.execute(""" SELECT a.*, v.title as vacancy_title, v.description as vacancy_description, v.salary as vacancy_salary, v.contact as vacancy_contact, u_applicant.full_name as applicant_name, u_applicant.email as applicant_email, u_applicant.phone as applicant_phone, u_applicant.telegram as applicant_telegram, u_employer.full_name as employer_name, u_employer.email as employer_email, u_employer.telegram as employer_telegram, c.name as company_name, c.description as company_description FROM applications a JOIN vacancies v ON a.vacancy_id = v.id JOIN users u_applicant ON a.user_id = u_applicant.id JOIN users u_employer ON v.user_id = u_employer.id LEFT JOIN companies c ON u_employer.id = c.user_id WHERE a.id = ? """, (application_id,)) application = cursor.fetchone() if not application: raise HTTPException(status_code=404, detail="Отклик не найден") # Проверяем права доступа (либо автор отклика, либо владелец вакансии) if application["user_id"] != user_id and application["v_user_id"] != user_id: raise HTTPException(status_code=403, detail="Нет доступа к этому отклику") # Если просматривает работодатель и статус 'pending', меняем на 'viewed' if application["v_user_id"] == user_id and application["status"] == 'pending': cursor.execute(""" UPDATE applications SET status = 'viewed', viewed_at = CURRENT_TIMESTAMP WHERE id = ? """, (application_id,)) conn.commit() application = dict(application) application["status"] = 'viewed' return dict(application) @app.put("/api/applications/{application_id}/status") async def update_application_status( application_id: int, status_update: ApplicationStatusUpdate, user_id: int = Depends(get_current_user) ): """Обновление статуса отклика (для работодателя)""" with get_db() as conn: cursor = conn.cursor() # Проверяем, что отклик существует и пользователь - владелец вакансии cursor.execute(""" SELECT a.*, v.user_id as employer_id FROM applications a JOIN vacancies v ON a.vacancy_id = v.id WHERE a.id = ? """, (application_id,)) application = cursor.fetchone() if not application: raise HTTPException(status_code=404, detail="Отклик не найден") if application["employer_id"] != user_id: raise HTTPException(status_code=403, detail="Только владелец вакансии может изменять статус") # Обновляем статус cursor.execute(""" UPDATE applications SET status = ?, response_message = ?, response_at = CURRENT_TIMESTAMP WHERE id = ? """, (status_update.status, status_update.response_message, application_id)) conn.commit() return {"message": "Статус обновлен"} @app.get("/api/applications/stats") async def get_application_stats(user_id: int = Depends(get_current_user)): """Получение статистики по откликам""" with get_db() as conn: cursor = conn.cursor() # Проверяем роль пользователя cursor.execute("SELECT role FROM users WHERE id = ?", (user_id,)) user = cursor.fetchone() stats = {} if user["role"] == "employer": # Статистика для работодателя (полученные отклики) cursor.execute(""" SELECT COUNT(*) as total, SUM(CASE WHEN status = 'pending' THEN 1 ELSE 0 END) as pending, SUM(CASE WHEN status = 'viewed' THEN 1 ELSE 0 END) as viewed, SUM(CASE WHEN status = 'accepted' THEN 1 ELSE 0 END) as accepted, SUM(CASE WHEN status = 'rejected' THEN 1 ELSE 0 END) as rejected FROM applications a JOIN vacancies v ON a.vacancy_id = v.id WHERE v.user_id = ? """, (user_id,)) else: # Статистика для соискателя (отправленные отклики) cursor.execute(""" SELECT COUNT(*) as total, SUM(CASE WHEN status = 'pending' THEN 1 ELSE 0 END) as pending, SUM(CASE WHEN status = 'viewed' THEN 1 ELSE 0 END) as viewed, SUM(CASE WHEN status = 'accepted' THEN 1 ELSE 0 END) as accepted, SUM(CASE WHEN status = 'rejected' THEN 1 ELSE 0 END) as rejected FROM applications WHERE user_id = ? """, (user_id,)) stats = dict(cursor.fetchone()) return stats # ========== АДМИНСКИЕ ЭНДПОИНТЫ ========== @app.get("/api/admin/stats") async def get_admin_stats(user_id: int = Depends(get_current_user)): """Получение статистики для админки""" with get_db() as conn: cursor = conn.cursor() # Проверяем, админ ли пользователь cursor.execute("SELECT is_admin FROM users WHERE id = ?", (user_id,)) user = cursor.fetchone() if not user or not user["is_admin"]: raise HTTPException(status_code=403, detail="Доступ запрещен") stats = {} cursor.execute("SELECT COUNT(*) FROM users") stats["total_users"] = cursor.fetchone()[0] cursor.execute("SELECT COUNT(*) FROM users WHERE role = 'employee'") stats["total_employees"] = cursor.fetchone()[0] cursor.execute("SELECT COUNT(*) FROM users WHERE role = 'employer'") stats["total_employers"] = cursor.fetchone()[0] cursor.execute("SELECT COUNT(*) FROM vacancies WHERE is_active = 1") stats["active_vacancies"] = cursor.fetchone()[0] cursor.execute("SELECT COUNT(*) FROM resumes") stats["total_resumes"] = cursor.fetchone()[0] cursor.execute("SELECT COUNT(*) FROM applications") stats["total_applications"] = cursor.fetchone()[0] cursor.execute("SELECT COUNT(*) FROM tags") stats["total_tags"] = cursor.fetchone()[0] # Последние регистрации cursor.execute(""" SELECT id, full_name, email, role, created_at FROM users ORDER BY created_at DESC LIMIT 10 """) stats["recent_users"] = [dict(u) for u in cursor.fetchall()] # Популярные теги cursor.execute(""" SELECT t.name, COUNT(vt.vacancy_id) as vacancy_count, COUNT(rt.resume_id) as resume_count FROM tags t LEFT JOIN vacancy_tags vt ON t.id = vt.tag_id LEFT JOIN resume_tags rt ON t.id = rt.tag_id GROUP BY t.id ORDER BY (COUNT(vt.vacancy_id) + COUNT(rt.resume_id)) DESC LIMIT 20 """) stats["popular_tags"] = [dict(t) for t in cursor.fetchall()] return stats @app.get("/api/admin/users") async def get_all_users(user_id: int = Depends(get_current_user)): """Получение всех пользователей (для админа)""" with get_db() as conn: cursor = conn.cursor() cursor.execute("SELECT is_admin FROM users WHERE id = ?", (user_id,)) user = cursor.fetchone() if not user or not user["is_admin"]: raise HTTPException(status_code=403, detail="Доступ запрещен") cursor.execute(""" SELECT id, full_name, email, phone, telegram, role, is_admin, created_at FROM users ORDER BY created_at DESC """) users = cursor.fetchall() return [dict(u) for u in users] @app.delete("/api/admin/users/{target_user_id}") async def delete_user(target_user_id: int, user_id: int = Depends(get_current_user)): """Удаление пользователя (для админа)""" with get_db() as conn: cursor = conn.cursor() cursor.execute("SELECT is_admin FROM users WHERE id = ?", (user_id,)) user = cursor.fetchone() if not user or not user["is_admin"]: raise HTTPException(status_code=403, detail="Доступ запрещен") cursor.execute("DELETE FROM users WHERE id = ?", (target_user_id,)) conn.commit() return {"message": "Пользователь удален"} @app.get("/api/admin/vacancies") async def get_all_vacancies_admin(user_id: int = Depends(get_current_user)): """Получение всех вакансий (для админа)""" with get_db() as conn: cursor = conn.cursor() cursor.execute("SELECT is_admin FROM users WHERE id = ?", (user_id,)) user = cursor.fetchone() if not user or not user["is_admin"]: raise HTTPException(status_code=403, detail="Доступ запрещен") cursor.execute(""" SELECT v.*, u.full_name as company_name FROM vacancies v JOIN users u ON v.user_id = u.id ORDER BY v.created_at DESC """) vacancies = cursor.fetchall() result = [] for v in vacancies: vacancy_dict = dict(v) vacancy_dict["tags"] = get_tags_for_vacancy(cursor, v["id"]) result.append(vacancy_dict) return result @app.delete("/api/admin/vacancies/{vacancy_id}") async def delete_vacancy_admin(vacancy_id: int, user_id: int = Depends(get_current_user)): """Удаление вакансии (для админа)""" with get_db() as conn: cursor = conn.cursor() cursor.execute("SELECT is_admin FROM users WHERE id = ?", (user_id,)) user = cursor.fetchone() if not user or not user["is_admin"]: raise HTTPException(status_code=403, detail="Доступ запрещен") cursor.execute("DELETE FROM vacancies WHERE id = ?", (vacancy_id,)) conn.commit() return {"message": "Вакансия удалена"} # ========== ПОЛЬЗОВАТЕЛЬСКИЕ ЭНДПОИНТЫ ========== @app.get("/api/user") async def get_user_info(user_id: int = Depends(get_current_user)): """Получение информации о текущем пользователе""" with get_db() as conn: cursor = conn.cursor() cursor.execute(""" SELECT id, full_name, email, phone, telegram, role, is_admin, created_at FROM users WHERE id = ? """, (user_id,)) user = cursor.fetchone() if not user: raise HTTPException(status_code=404, detail="Пользователь не найден") return dict(user) @app.get("/api/users/{target_user_id}/contact") async def get_user_contact( target_user_id: int, current_user: int = Depends(get_current_user) ): """Получение контактных данных пользователя""" with get_db() as conn: cursor = conn.cursor() cursor.execute("SELECT email, phone, telegram FROM users WHERE id = ?", (target_user_id,)) user = cursor.fetchone() if not user: raise HTTPException(status_code=404, detail="Пользователь не найден") return dict(user) # Инициализация базы данных init_db() set_initial_sequence_ids() if __name__ == "__main__": print("🚀 Запуск сервера на http://localhost:8000") print("📚 Документация API: http://localhost:8000/docs") print("👤 Админ: admin@rabota.today / admin123") print("📁 HTML страницы должны быть в папке templates/") uvicorn.run(app, host="0.0.0.0", port=8000)