Files
yarmarka/server.py
2026-03-16 18:57:22 +03:00

2625 lines
96 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# main.py (полная версия с тегами и админкой)
from fastapi import FastAPI, HTTPException, Depends, status, Request, Form
from fastapi.middleware.cors import CORSMiddleware
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from fastapi.responses import HTMLResponse, FileResponse, RedirectResponse
from fastapi.staticfiles import StaticFiles
from pydantic import BaseModel, EmailStr
from typing import Optional, List
from datetime import datetime, timedelta
import jwt
import sqlite3
import hashlib
import os
from contextlib import contextmanager
import uvicorn
from pathlib import Path
import traceback
from fastapi.middleware.trustedhost import TrustedHostMiddleware
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import JSONResponse
import re
app = FastAPI(title="Rabota.Today API", version="2.0.0")
# Настройки
SECRET_KEY = "your-secret-key-here-change-in-production"
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 60 * 24 * 7 # 7 дней
# CORS
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
security = HTTPBearer()
# Создаем директории
BASE_DIR = Path(__file__).parent
TEMPLATES_DIR = BASE_DIR / "templates"
STATIC_DIR = BASE_DIR / "static"
TEMPLATES_DIR.mkdir(exist_ok=True)
STATIC_DIR.mkdir(exist_ok=True)
(STATIC_DIR / "css").mkdir(exist_ok=True)
(STATIC_DIR / "js").mkdir(exist_ok=True)
# Подключаем статические файлы
app.mount("/static", StaticFiles(directory=str(STATIC_DIR)), name="static")
# ========== МОДЕЛИ БАЗЫ ДАННЫХ ==========
def init_db():
"""Инициализация базы данных"""
with get_db() as conn:
cursor = conn.cursor()
# Таблица пользователей
cursor.execute("""
CREATE TABLE IF NOT EXISTS users (
id INTEGER PRIMARY KEY AUTOINCREMENT,
full_name TEXT NOT NULL,
email TEXT UNIQUE NOT NULL,
phone TEXT NOT NULL,
telegram TEXT,
password_hash TEXT NOT NULL,
role TEXT NOT NULL CHECK(role IN ('employee', 'employer', 'admin')),
is_admin BOOLEAN DEFAULT 0,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
""")
# Таблица вакансий
cursor.execute("""
CREATE TABLE IF NOT EXISTS vacancies (
id INTEGER PRIMARY KEY AUTOINCREMENT,
user_id INTEGER NOT NULL,
title TEXT NOT NULL,
salary TEXT,
description TEXT,
contact TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
is_active BOOLEAN DEFAULT 1,
views INTEGER DEFAULT 0,
FOREIGN KEY (user_id) REFERENCES users (id)
)
""")
# Таблица резюме
cursor.execute("""
CREATE TABLE IF NOT EXISTS resumes (
id INTEGER PRIMARY KEY AUTOINCREMENT,
user_id INTEGER UNIQUE NOT NULL,
desired_position TEXT,
about_me TEXT,
desired_salary TEXT,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
views INTEGER DEFAULT 0,
FOREIGN KEY (user_id) REFERENCES users (id)
)
""")
# Таблица тегов
cursor.execute("""
CREATE TABLE IF NOT EXISTS tags (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT UNIQUE NOT NULL,
category TEXT CHECK(category IN ('skill', 'industry', 'position', 'other')) DEFAULT 'skill',
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
""")
# Связь тегов с вакансиями
cursor.execute("""
CREATE TABLE IF NOT EXISTS vacancy_tags (
vacancy_id INTEGER NOT NULL,
tag_id INTEGER NOT NULL,
PRIMARY KEY (vacancy_id, tag_id),
FOREIGN KEY (vacancy_id) REFERENCES vacancies (id) ON DELETE CASCADE,
FOREIGN KEY (tag_id) REFERENCES tags (id) ON DELETE CASCADE
)
""")
# Связь тегов с резюме
cursor.execute("""
CREATE TABLE IF NOT EXISTS resume_tags (
resume_id INTEGER NOT NULL,
tag_id INTEGER NOT NULL,
PRIMARY KEY (resume_id, tag_id),
FOREIGN KEY (resume_id) REFERENCES resumes (id) ON DELETE CASCADE,
FOREIGN KEY (tag_id) REFERENCES tags (id) ON DELETE CASCADE
)
""")
# Таблица опыта работы (ОБНОВЛЕНА - добавлено поле description)
cursor.execute("""
CREATE TABLE IF NOT EXISTS work_experience (
id INTEGER PRIMARY KEY AUTOINCREMENT,
resume_id INTEGER NOT NULL,
position TEXT NOT NULL,
company TEXT NOT NULL,
period TEXT,
description TEXT,
FOREIGN KEY (resume_id) REFERENCES resumes (id) ON DELETE CASCADE
)
""")
# Таблица образования
cursor.execute("""
CREATE TABLE IF NOT EXISTS education (
id INTEGER PRIMARY KEY AUTOINCREMENT,
resume_id INTEGER NOT NULL,
institution TEXT NOT NULL,
specialty TEXT,
graduation_year TEXT,
FOREIGN KEY (resume_id) REFERENCES resumes (id) ON DELETE CASCADE
)
""")
# Таблица откликов (обновленная версия)
cursor.execute("""
CREATE TABLE IF NOT EXISTS applications (
id INTEGER PRIMARY KEY AUTOINCREMENT,
vacancy_id INTEGER NOT NULL,
user_id INTEGER NOT NULL,
message TEXT,
status TEXT DEFAULT 'pending' CHECK(status IN ('pending', 'viewed', 'accepted', 'rejected')),
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
viewed_at TIMESTAMP,
response_message TEXT,
response_at TIMESTAMP,
FOREIGN KEY (vacancy_id) REFERENCES vacancies (id) ON DELETE CASCADE,
FOREIGN KEY (user_id) REFERENCES users (id) ON DELETE CASCADE
)
""")
# Таблица избранного
cursor.execute("""
CREATE TABLE IF NOT EXISTS favorites (
id INTEGER PRIMARY KEY AUTOINCREMENT,
user_id INTEGER NOT NULL,
item_type TEXT NOT NULL CHECK(item_type IN ('vacancy', 'resume')),
item_id INTEGER NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
UNIQUE(user_id, item_type, item_id),
FOREIGN KEY (user_id) REFERENCES users (id) ON DELETE CASCADE
)
""")
# Таблица компаний
cursor.execute("""
CREATE TABLE IF NOT EXISTS companies (
id INTEGER PRIMARY KEY AUTOINCREMENT,
user_id INTEGER UNIQUE NOT NULL,
name TEXT NOT NULL,
description TEXT,
website TEXT,
logo TEXT,
address TEXT,
phone TEXT,
email TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
FOREIGN KEY (user_id) REFERENCES users (id) ON DELETE CASCADE
)
""")
# Создаем индексы для быстрого поиска
cursor.execute("CREATE INDEX IF NOT EXISTS idx_applications_vacancy ON applications(vacancy_id)")
cursor.execute("CREATE INDEX IF NOT EXISTS idx_applications_user ON applications(user_id)")
cursor.execute("CREATE INDEX IF NOT EXISTS idx_applications_status ON applications(status)")
cursor.execute("CREATE INDEX IF NOT EXISTS idx_favorites_user ON favorites(user_id)")
cursor.execute("CREATE INDEX IF NOT EXISTS idx_favorites_item ON favorites(item_type, item_id)")
# Создаем админа по умолчанию
admin_password = hash_password("admin123")
cursor.execute("""
INSERT OR IGNORE INTO users (full_name, email, phone, password_hash, role, is_admin)
VALUES ('Admin', 'admin@rabota.today', '+70000000000', ?, 'admin', 1)
""", (admin_password,))
# Добавляем популярные теги
default_tags = [
('Python', 'skill'),
('JavaScript', 'skill'),
('React', 'skill'),
('Vue.js', 'skill'),
('Node.js', 'skill'),
('Java', 'skill'),
('C++', 'skill'),
('PHP', 'skill'),
('HTML/CSS', 'skill'),
('SQL', 'skill'),
('Менеджер', 'position'),
('Разработчик', 'position'),
('Дизайнер', 'position'),
('Маркетолог', 'position'),
('Аналитик', 'position'),
('IT', 'industry'),
('Финансы', 'industry'),
('Медицина', 'industry'),
('Образование', 'industry'),
('Продажи', 'industry'),
]
for tag_name, category in default_tags:
cursor.execute("INSERT OR IGNORE INTO tags (name, category) VALUES (?, ?)", (tag_name, category))
conn.commit()
print("✅ База данных инициализирована")
def set_initial_sequence_ids():
"""Установка начальных ID для таблиц при первом запуске"""
with get_db() as conn:
cursor = conn.cursor()
# Проверяем, есть ли уже записи в таблицах
cursor.execute("SELECT COUNT(*) FROM vacancies")
vacancies_count = cursor.fetchone()[0]
cursor.execute("SELECT COUNT(*) FROM resumes")
resumes_count = cursor.fetchone()[0]
# Если таблицы пустые, устанавливаем начальные ID
if vacancies_count == 0:
cursor.execute("INSERT OR REPLACE INTO sqlite_sequence (name, seq) VALUES ('vacancies', 9999)")
print("✅ Установлен начальный ID для вакансий: 10000")
if resumes_count == 0:
cursor.execute("INSERT OR REPLACE INTO sqlite_sequence (name, seq) VALUES ('resumes', 9999)")
print("✅ Установлен начальный ID для резюме: 10000")
conn.commit()
@contextmanager
def get_db():
"""Контекстный менеджер для работы с БД"""
conn = sqlite3.connect("rabota_today.db")
conn.row_factory = sqlite3.Row
try:
yield conn
finally:
conn.close()
# ========== МОДЕЛИ PYDANTIC ==========
class UserRegister(BaseModel):
full_name: str
email: EmailStr
phone: str
telegram: Optional[str] = None
password: str
role: str
class UserLogin(BaseModel):
email: EmailStr
password: str
class TokenResponse(BaseModel):
access_token: str
token_type: str
user_id: int
full_name: str
role: str
is_admin: bool
class Tag(BaseModel):
id: Optional[int] = None
name: str
category: str = "skill"
class VacancyCreate(BaseModel):
title: str
salary: Optional[str] = None
description: Optional[str] = None
contact: Optional[str] = None
tags: List[str] = [] # список названий тегов
class VacancyResponse(BaseModel):
id: int
title: str
salary: Optional[str]
description: Optional[str]
contact: Optional[str]
created_at: str
is_active: bool
views: int
tags: List[Tag] = []
company_name: Optional[str] = None
class VacancyUpdate(BaseModel):
title: Optional[str] = None
salary: Optional[str] = None
description: Optional[str] = None
contact: Optional[str] = None
tags: Optional[List[str]] = None
is_active: Optional[bool] = None
class WorkExperience(BaseModel):
position: str
company: str
period: Optional[str] = None
description: Optional[str] = None
class Education(BaseModel):
institution: str
specialty: Optional[str] = None
graduation_year: Optional[str] = None
class ResumeCreate(BaseModel):
desired_position: Optional[str] = None
about_me: Optional[str] = None
desired_salary: Optional[str] = None
work_experience: List[WorkExperience] = []
education: List[Education] = []
tags: List[str] = [] # список названий тегов
class ResumeResponse(BaseModel):
id: int
user_id: int
desired_position: Optional[str]
about_me: Optional[str]
desired_salary: Optional[str]
work_experience: List[WorkExperience]
education: List[Education]
tags: List[Tag] = []
views: int
updated_at: str
full_name: Optional[str] = None
class CompanyCreate(BaseModel):
name: str
description: Optional[str] = None
website: Optional[str] = None
logo: Optional[str] = None
address: Optional[str] = None
phone: Optional[str] = None
email: Optional[str] = None
class CompanyResponse(BaseModel):
id: int
user_id: int
name: str
description: Optional[str]
website: Optional[str]
logo: Optional[str]
address: Optional[str]
phone: Optional[str]
email: Optional[str]
created_at: str
updated_at: str
class FavoriteCreate(BaseModel):
item_type: str # 'vacancy' или 'resume'
item_id: int
class FavoriteResponse(BaseModel):
id: int
user_id: int
item_type: str
item_id: int
created_at: str
# Для расширенной информации
item_data: Optional[dict] = None
class FavoriteItem(BaseModel):
id: int
type: str
title: str
subtitle: str
salary: Optional[str] = None
company: Optional[str] = None
name: Optional[str] = None
tags: List[str] = []
created_at: str
url: str
class ApplicationCreate(BaseModel):
vacancy_id: int
message: Optional[str] = None
class ApplicationResponse(BaseModel):
id: int
vacancy_id: int
user_id: int
message: Optional[str]
status: str
created_at: str
viewed_at: Optional[str]
response_message: Optional[str]
response_at: Optional[str]
# Данные о вакансии
vacancy_title: Optional[str] = None
company_name: Optional[str] = None
# Данные о соискателе
user_name: Optional[str] = None
user_email: Optional[str] = None
user_phone: Optional[str] = None
user_telegram: Optional[str] = None
class ApplicationStatusUpdate(BaseModel):
status: str # 'viewed', 'accepted', 'rejected'
response_message: Optional[str] = None
class ApplicationStats(BaseModel):
total: int
pending: int
viewed: int
accepted: int
rejected: int
# ========== ВСПОМОГАТЕЛЬНЫЕ ФУНКЦИИ ==========
def hash_password(password: str) -> str:
"""Хеширование пароля"""
salt = "rabota.today.salt"
return hashlib.sha256((password + salt).encode()).hexdigest()
def verify_password(password: str, hash: str) -> bool:
"""Проверка пароля"""
return hash_password(password) == hash
def create_access_token(data: dict):
"""Создание JWT токена с явным преобразованием ID в строку"""
to_encode = data.copy()
expire = datetime.utcnow() + timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
to_encode.update({"exp": expire})
# Убеждаемся, что user_id сохраняется как строка
if "sub" in to_encode:
to_encode["sub"] = str(to_encode["sub"])
return jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
def get_current_user(credentials: HTTPAuthorizationCredentials = Depends(security)):
"""Получение текущего пользователя из токена с улучшенной обработкой ошибок"""
token = credentials.credentials
try:
# Декодируем токен
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
# Получаем user_id из поля sub
user_id = payload.get("sub")
if user_id is None:
print("❌ No sub in token payload")
raise HTTPException(status_code=401, detail="Невалидный токен: отсутствует идентификатор пользователя")
# Пробуем преобразовать в整数
try:
user_id_int = int(user_id)
except (ValueError, TypeError) as e:
print(f"❌ Cannot convert user_id to int: {user_id}, error: {e}")
raise HTTPException(status_code=401, detail=f"Невалидный формат ID пользователя: {user_id}")
print(f"✅ User authenticated: {user_id_int}")
return user_id_int
except jwt.ExpiredSignatureError:
print("❌ Token expired")
raise HTTPException(status_code=401, detail="Токен истек")
except jwt.InvalidTokenError as e:
print(f"❌ Invalid token: {e}")
raise HTTPException(status_code=401, detail=f"Невалидный токен: {str(e)}")
except Exception as e:
print(f"❌ Unexpected error in get_current_user: {e}")
traceback.print_exc()
raise HTTPException(status_code=401, detail=f"Ошибка авторизации: {str(e)}")
def get_current_user_optional(token: Optional[str] = None):
"""Опциональное получение пользователя (для публичных страниц)"""
if not token:
return None
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
return int(payload.get("sub"))
except:
return None
def get_tags_for_vacancy(cursor, vacancy_id):
"""Получение тегов для вакансии"""
cursor.execute("""
SELECT t.* FROM tags t
JOIN vacancy_tags vt ON t.id = vt.tag_id
WHERE vt.vacancy_id = ?
""", (vacancy_id,))
return [dict(tag) for tag in cursor.fetchall()]
def get_tags_for_resume(cursor, resume_id):
"""Получение тегов для резюме"""
cursor.execute("""
SELECT t.* FROM tags t
JOIN resume_tags rt ON t.id = rt.tag_id
WHERE rt.resume_id = ?
""", (resume_id,))
return [dict(tag) for tag in cursor.fetchall()]
def add_tags_to_vacancy(cursor, vacancy_id, tag_names):
"""Добавление тегов к вакансии"""
for tag_name in tag_names:
# Ищем или создаем тег
cursor.execute("SELECT id FROM tags WHERE name = ?", (tag_name,))
tag = cursor.fetchone()
if tag:
tag_id = tag["id"]
else:
cursor.execute("INSERT INTO tags (name) VALUES (?)", (tag_name,))
tag_id = cursor.lastrowid
# Связываем тег с вакансией
cursor.execute("""
INSERT OR IGNORE INTO vacancy_tags (vacancy_id, tag_id)
VALUES (?, ?)
""", (vacancy_id, tag_id))
def add_tags_to_resume(cursor, resume_id, tag_names):
"""Добавление тегов к резюме"""
for tag_name in tag_names:
cursor.execute("SELECT id FROM tags WHERE name = ?", (tag_name,))
tag = cursor.fetchone()
if tag:
tag_id = tag["id"]
else:
cursor.execute("INSERT INTO tags (name) VALUES (?)", (tag_name,))
tag_id = cursor.lastrowid
cursor.execute("""
INSERT OR IGNORE INTO resume_tags (resume_id, tag_id)
VALUES (?, ?)
""", (resume_id, tag_id))
# ========== HTML СТРАНИЦЫ ==========
@app.get("/", response_class=HTMLResponse)
async def get_index(request: Request):
"""Главная страница с определением устройства"""
# Определяем мобильное устройство по User-Agent
user_agent = request.headers.get("user-agent", "").lower()
is_mobile = any([
"mobile" in user_agent,
"android" in user_agent,
"iphone" in user_agent,
"ipad" in user_agent
])
if is_mobile:
file_path = TEMPLATES_DIR / "index_mobile.html"
else:
file_path = TEMPLATES_DIR / "index.html"
if file_path.exists():
with open(file_path, "r", encoding="utf-8") as f:
return HTMLResponse(content=f.read())
return HTMLResponse(content="<h1>Rabota.Today</h1><p>Страница не найдена</p>")
@app.get("/login", response_class=HTMLResponse)
async def get_login():
"""Страница входа"""
file_path = TEMPLATES_DIR / "login.html"
return FileResponse(file_path) if file_path.exists() else HTMLResponse(
content="<h1>Страница входа</h1><p>Создайте файл templates/login.html</p>")
@app.get("/companies", response_class=HTMLResponse)
async def get_companies_page():
"""Страница со списком компаний"""
file_path = TEMPLATES_DIR / "companies.html"
if file_path.exists():
with open(file_path, "r", encoding="utf-8") as f:
return HTMLResponse(content=f.read())
return HTMLResponse(content="<h1>Компании</h1><p>Страница в разработке</p>")
@app.get("/company/{company_id}", response_class=HTMLResponse)
async def get_company_page(company_id: int):
"""Страница компании"""
file_path = TEMPLATES_DIR / "company_detail.html"
if file_path.exists():
with open(file_path, "r", encoding="utf-8") as f:
return HTMLResponse(content=f.read())
return HTMLResponse(content="<h1>Страница компании</h1><p>Файл не найден</p>")
@app.get("/debug_simple", response_class=HTMLResponse)
async def debug():
"""Страница debug"""
file_path = TEMPLATES_DIR / "debug.html"
return FileResponse(file_path) if file_path.exists() else HTMLResponse(
content="<h1>Страница входа</h1><p>Создайте файл templates/debug.html</p>")
@app.get("/debug", response_class=HTMLResponse)
async def get_debug_page():
"""Страница диагностики для мобильных устройств"""
file_path = "templates/mobile_debug_simple.html"
if os.path.exists(file_path):
with open(file_path, "r", encoding="utf-8") as f:
return HTMLResponse(content=f.read())
return HTMLResponse(content="""
<!DOCTYPE html>
<html>
<head><title>Debug</title></head>
<body>
<h1>Debug page not found</h1>
<p>Create file: templates/mobile_debug_simple.html</p>
</body>
</html>
""")
@app.get("/register", response_class=HTMLResponse)
async def get_register_page():
"""Отдельная страница регистрации"""
file_path = TEMPLATES_DIR / "register.html"
if not file_path.exists():
return HTMLResponse(content="<h1>Страница регистрации</h1><p>Создайте файл templates/register.html</p>")
return FileResponse(file_path)
@app.get("/register", response_class=HTMLResponse)
async def get_register():
"""Страница регистрации"""
file_path = TEMPLATES_DIR / "register.html"
return FileResponse(file_path) if file_path.exists() else HTMLResponse(
content="<h1>Страница регистрации</h1><p>Создайте файл templates/register.html</p>")
@app.get("/profile", response_class=HTMLResponse)
async def get_profile():
"""Страница профиля"""
file_path = TEMPLATES_DIR / "profile.html"
return FileResponse(file_path) if file_path.exists() else HTMLResponse(
content="<h1>Профиль</h1><p>Создайте файл templates/profile.html</p>")
@app.get("/vacancies", response_class=HTMLResponse)
async def get_vacancies():
"""Страница со списком вакансий"""
file_path = TEMPLATES_DIR / "vacancies.html"
return FileResponse(file_path) if file_path.exists() else HTMLResponse(
content="<h1>Вакансии</h1><p>Создайте файл templates/vacancies.html</p>")
@app.get("/vacancy/{vacancy_id}", response_class=HTMLResponse)
async def get_vacancy_detail(vacancy_id: int):
"""Страница конкретной вакансии"""
file_path = TEMPLATES_DIR / "vacancy_detail.html"
return FileResponse(file_path) if file_path.exists() else HTMLResponse(
content=f"<h1>Вакансия #{vacancy_id}</h1><p>Создайте файл templates/vacancy_detail.html</p>")
@app.get("/resumes", response_class=HTMLResponse)
async def get_resumes():
"""Страница со списком резюме"""
file_path = TEMPLATES_DIR / "resumes.html"
return FileResponse(file_path) if file_path.exists() else HTMLResponse(
content="<h1>Резюме</h1><p>Создайте файл templates/resumes.html</p>")
@app.get("/resume/{resume_id}", response_class=HTMLResponse)
async def get_resume_detail(resume_id: int):
"""Страница конкретного резюме"""
file_path = TEMPLATES_DIR / "resume_detail.html"
return FileResponse(file_path) if file_path.exists() else HTMLResponse(
content=f"<h1>Резюме #{resume_id}</h1><p>Создайте файл templates/resume_detail.html</p>")
@app.get("/favorites", response_class=HTMLResponse)
async def get_favorites_page():
"""Страница избранного"""
file_path = TEMPLATES_DIR / "favorites.html"
return FileResponse(file_path) if file_path.exists() else HTMLResponse(content="<h1>Избранное</h1><p>Создайте файл templates/favorites.html</p>")
@app.get("/applications", response_class=HTMLResponse)
async def get_applications_page():
"""Страница со списком откликов"""
file_path = TEMPLATES_DIR / "applications.html"
return FileResponse(file_path) if file_path.exists() else HTMLResponse(content="<h1>Отклики</h1><p>Создайте файл templates/applications.html</p>")
@app.get("/application/{application_id}", response_class=HTMLResponse)
async def get_application_page(application_id: int):
"""Детальная страница отклика"""
file_path = TEMPLATES_DIR / "application_detail.html"
return FileResponse(file_path) if file_path.exists() else HTMLResponse(content=f"<h1>Отклик #{application_id}</h1><p>Создайте файл templates/application_detail.html</p>")
@app.get("/admin", response_class=HTMLResponse)
async def get_admin():
"""Страница админки"""
file_path = TEMPLATES_DIR / "admin.html"
return FileResponse(file_path) if file_path.exists() else HTMLResponse(
content="<h1>Админка</h1><p>Создайте файл templates/admin.html</p>")
# ========== API ЭНДПОИНТЫ ==========
@app.get("/api/public/stats")
async def get_public_stats():
"""Публичная статистика для главной страницы"""
try:
with get_db() as conn:
cursor = conn.cursor()
# Активные вакансии
cursor.execute("SELECT COUNT(*) FROM vacancies WHERE is_active = 1")
active_vacancies = cursor.fetchone()[0]
# Все резюме
cursor.execute("SELECT COUNT(*) FROM resumes")
total_resumes = cursor.fetchone()[0]
# Работодатели (компании)
cursor.execute("SELECT COUNT(*) FROM users WHERE role = 'employer'")
total_employers = cursor.fetchone()[0]
# Все пользователи
cursor.execute("SELECT COUNT(*) FROM users")
total_users = cursor.fetchone()[0]
return {
"active_vacancies": active_vacancies,
"total_resumes": total_resumes,
"total_employers": total_employers,
"total_users": total_users
}
except Exception as e:
print(f"Error getting public stats: {e}")
return {
"active_vacancies": 0,
"total_resumes": 0,
"total_employers": 0,
"total_users": 0
}
@app.post("/api/register", response_model=TokenResponse)
async def register(user: UserRegister, request: Request):
"""Регистрация нового пользователя с поддержкой мобильных устройств"""
try:
# Валидация email
if not re.match(r"^[a-zA-Z0-9._%+-]+@[a-zA-Z0-9.-]+\.[a-zA-Z]{2,}$", user.email):
raise HTTPException(status_code=400, detail="Неверный формат email")
# Валидация телефона
phone_digits = re.sub(r"\D", "", user.phone)
if len(phone_digits) < 10:
raise HTTPException(status_code=400, detail="Неверный формат телефона")
with get_db() as conn:
cursor = conn.cursor()
# Проверка существования пользователя
cursor.execute("SELECT id FROM users WHERE email = ?", (user.email,))
if cursor.fetchone():
raise HTTPException(status_code=400, detail="Email уже зарегистрирован")
# Создание пользователя
password_hash = hash_password(user.password)
is_admin = 1 if user.email == "admin@rabota.today" else 0
cursor.execute("""
INSERT INTO users (full_name, email, phone, telegram, password_hash, role, is_admin)
VALUES (?, ?, ?, ?, ?, ?, ?)
""", (user.full_name, user.email, user.phone, user.telegram, password_hash, user.role, is_admin))
user_id = cursor.lastrowid
conn.commit()
# Создание токена с дополнительной информацией
token_data = {
"sub": str(user_id),
"role": user.role,
"is_admin": bool(is_admin),
"device": "mobile" if request.state.is_mobile else "desktop"
}
token = create_access_token(token_data)
response_data = {
"access_token": token,
"token_type": "bearer",
"user_id": user_id,
"full_name": user.full_name,
"role": user.role,
"is_admin": bool(is_admin)
}
# Добавляем заголовки для мобильных устройств
if request.state.is_mobile:
return JSONResponse(
content=response_data,
headers={
"Access-Control-Allow-Credentials": "true",
"Access-Control-Expose-Headers": "Authorization"
}
)
return response_data
except HTTPException:
raise
except Exception as e:
print(f"Registration error: {e}")
raise HTTPException(status_code=500, detail="Внутренняя ошибка сервера")
@app.post("/api/login", response_model=TokenResponse)
async def login(user_data: UserLogin, request: Request):
"""Вход в систему с поддержкой мобильных устройств"""
try:
with get_db() as conn:
cursor = conn.cursor()
cursor.execute("SELECT * FROM users WHERE email = ?", (user_data.email,))
user = cursor.fetchone()
if not user or not verify_password(user_data.password, user["password_hash"]):
raise HTTPException(status_code=401, detail="Неверный email или пароль")
token_data = {
"sub": str(user["id"]),
"role": user["role"],
"is_admin": user["is_admin"],
"device": "mobile" if request.state.is_mobile else "desktop"
}
token = create_access_token(token_data)
response_data = {
"access_token": token,
"token_type": "bearer",
"user_id": user["id"],
"full_name": user["full_name"],
"role": user["role"],
"is_admin": bool(user["is_admin"])
}
if request.state.is_mobile:
return JSONResponse(
content=response_data,
headers={
"Access-Control-Allow-Credentials": "true",
"Access-Control-Expose-Headers": "Authorization"
}
)
return response_data
except HTTPException:
raise
except Exception as e:
print(f"Login error: {e}")
raise HTTPException(status_code=500, detail="Внутренняя ошибка сервера")
@app.get("/api/tags")
async def get_all_tags(category: Optional[str] = None):
"""Получение всех тегов"""
with get_db() as conn:
cursor = conn.cursor()
if category:
cursor.execute("SELECT * FROM tags WHERE category = ? ORDER BY name", (category,))
else:
cursor.execute("SELECT * FROM tags ORDER BY category, name")
tags = cursor.fetchall()
return [dict(tag) for tag in tags]
@app.post("/api/tags")
async def create_tag(tag: Tag, user_id: int = Depends(get_current_user)):
"""Создание нового тега (только для админа)"""
with get_db() as conn:
cursor = conn.cursor()
# Проверяем, админ ли пользователь
cursor.execute("SELECT is_admin FROM users WHERE id = ?", (user_id,))
user = cursor.fetchone()
if not user or not user["is_admin"]:
raise HTTPException(status_code=403, detail="Только администратор может создавать теги")
try:
cursor.execute("INSERT INTO tags (name, category) VALUES (?, ?)", (tag.name, tag.category))
conn.commit()
tag_id = cursor.lastrowid
cursor.execute("SELECT * FROM tags WHERE id = ?", (tag_id,))
return dict(cursor.fetchone())
except sqlite3.IntegrityError:
raise HTTPException(status_code=400, detail="Тег с таким названием уже существует")
# ========== ЭНДПОИНТЫ ДЛЯ ВАКАНСИЙ ==========
@app.post("/api/vacancies", response_model=VacancyResponse)
async def create_vacancy(
vacancy: VacancyCreate,
user_id: int = Depends(get_current_user)
):
"""Создание новой вакансии"""
with get_db() as conn:
cursor = conn.cursor()
cursor.execute("SELECT role FROM users WHERE id = ?", (user_id,))
user = cursor.fetchone()
if not user or user["role"] != "employer":
raise HTTPException(status_code=403, detail="Только работодатели могут создавать вакансии")
cursor.execute("""
INSERT INTO vacancies (user_id, title, salary, description, contact)
VALUES (?, ?, ?, ?, ?)
""", (user_id, vacancy.title, vacancy.salary, vacancy.description, vacancy.contact))
vacancy_id = cursor.lastrowid
# Добавляем теги
if vacancy.tags:
add_tags_to_vacancy(cursor, vacancy_id, vacancy.tags)
conn.commit()
cursor.execute("SELECT * FROM vacancies WHERE id = ?", (vacancy_id,))
new_vacancy = dict(cursor.fetchone())
new_vacancy["tags"] = get_tags_for_vacancy(cursor, vacancy_id)
return new_vacancy
@app.get("/api/vacancies", response_model=List[VacancyResponse])
async def get_my_vacancies(user_id: int = Depends(get_current_user)):
"""Получение всех вакансий текущего пользователя с данными компании"""
with get_db() as conn:
cursor = conn.cursor()
# Получаем компанию пользователя
cursor.execute("SELECT name FROM companies WHERE user_id = ?", (user_id,))
company = cursor.fetchone()
company_name = company["name"] if company else None
cursor.execute("""
SELECT * FROM vacancies
WHERE user_id = ? AND is_active = 1
ORDER BY created_at DESC
""", (user_id,))
vacancies = cursor.fetchall()
result = []
for v in vacancies:
vacancy_dict = dict(v)
# Добавляем название компании (либо из таблицы companies, либо имя пользователя)
if company_name:
vacancy_dict["company_name"] = company_name
else:
cursor.execute("SELECT full_name FROM users WHERE id = ?", (user_id,))
user = cursor.fetchone()
vacancy_dict["company_name"] = user["full_name"] if user else None
# Добавляем теги
cursor.execute("""
SELECT t.* FROM tags t
JOIN vacancy_tags vt ON t.id = vt.tag_id
WHERE vt.vacancy_id = ?
""", (v["id"],))
vacancy_dict["tags"] = [dict(tag) for tag in cursor.fetchall()]
result.append(vacancy_dict)
return result
@app.get("/api/vacancies/all")
async def get_all_vacancies(
page: int = 1,
search: str = None,
tags: str = None,
min_salary: int = None,
sort: str = "newest",
company_id: int = None # Новый параметр
):
"""Получение всех вакансий с фильтрацией"""
with get_db() as conn:
cursor = conn.cursor()
query = """
SELECT DISTINCT
v.*,
COALESCE(c.name, u.full_name) as company_name
FROM vacancies v
JOIN users u ON v.user_id = u.id
LEFT JOIN companies c ON v.user_id = c.user_id
WHERE v.is_active = 1
"""
params = []
# Фильтр по компании
if company_id:
query += " AND c.id = ?"
params.append(company_id)
# Остальные фильтры...
if search:
query += " AND (v.title LIKE ? OR v.description LIKE ?)"
params.extend([f"%{search}%", f"%{search}%"])
if tags:
tag_list = [t.strip() for t in tags.split(',')]
placeholders = ','.join(['?'] * len(tag_list))
query += f"""
AND v.id IN (
SELECT vt.vacancy_id
FROM vacancy_tags vt
JOIN tags t ON vt.tag_id = t.id
WHERE t.name IN ({placeholders})
GROUP BY vt.vacancy_id
HAVING COUNT(DISTINCT t.name) = ?
)
"""
params.extend(tag_list + [len(tag_list)])
if min_salary:
query += " AND v.salary LIKE ?"
params.append(f"%{min_salary}%")
if sort == "salary_desc":
query += " ORDER BY v.salary DESC"
elif sort == "salary_asc":
query += " ORDER BY v.salary ASC"
else:
query += " ORDER BY v.created_at DESC"
# Пагинация
limit = 10
offset = (page - 1) * limit
query += " LIMIT ? OFFSET ?"
params.extend([limit, offset])
cursor.execute(query, params)
vacancies = cursor.fetchall()
result = []
for v in vacancies:
vacancy_dict = dict(v)
# Добавляем теги
cursor.execute("""
SELECT t.* FROM tags t
JOIN vacancy_tags vt ON t.id = vt.tag_id
WHERE vt.vacancy_id = ?
""", (v["id"],))
vacancy_dict["tags"] = [dict(tag) for tag in cursor.fetchall()]
result.append(vacancy_dict)
# Получаем общее количество
count_query = "SELECT COUNT(*) FROM vacancies WHERE is_active = 1"
if company_id:
count_query += " AND user_id = (SELECT user_id FROM companies WHERE id = ?)"
cursor.execute(count_query, (company_id,))
else:
cursor.execute(count_query)
total = cursor.fetchone()[0]
return {
"vacancies": result,
"total_pages": (total + limit - 1) // limit,
"current_page": page
}
@app.get("/api/vacancies/my-all")
async def get_my_all_vacancies(
user_id: int = Depends(get_current_user)
):
"""Получение ВСЕХ вакансий ТЕКУЩЕГО пользователя (включая неактивные)"""
with get_db() as conn:
cursor = conn.cursor()
# Получаем компанию пользователя
cursor.execute("SELECT name FROM companies WHERE user_id = ?", (user_id,))
company = cursor.fetchone()
company_name = company["name"] if company else None
# Получаем ВСЕ вакансии пользователя без фильтра по is_active
cursor.execute("""
SELECT * FROM vacancies
WHERE user_id = ?
ORDER BY
is_active DESC, -- сначала активные
created_at DESC -- потом по дате
""", (user_id,))
vacancies = cursor.fetchall()
result = []
for v in vacancies:
vacancy_dict = dict(v)
if company_name:
vacancy_dict["company_name"] = company_name
else:
cursor.execute("SELECT full_name FROM users WHERE id = ?", (user_id,))
user = cursor.fetchone()
vacancy_dict["company_name"] = user["full_name"] if user else None
# Добавляем теги
cursor.execute("""
SELECT t.* FROM tags t
JOIN vacancy_tags vt ON t.id = vt.tag_id
WHERE vt.vacancy_id = ?
""", (v["id"],))
vacancy_dict["tags"] = [dict(tag) for tag in cursor.fetchall()]
result.append(vacancy_dict)
return result
@app.get("/api/vacancies/{vacancy_id}")
async def get_vacancy(vacancy_id: int, token: Optional[str] = None):
"""Получение конкретной вакансии с данными компании"""
user_id = get_current_user_optional(token)
with get_db() as conn:
cursor = conn.cursor()
# Увеличиваем счетчик просмотров
cursor.execute("UPDATE vacancies SET views = views + 1 WHERE id = ?", (vacancy_id,))
# Получаем вакансию с данными компании
cursor.execute("""
SELECT
v.*,
COALESCE(c.name, u.full_name) as company_name,
c.description as company_description,
c.website as company_website,
c.logo as company_logo,
c.address as company_address,
c.phone as company_phone,
c.email as company_email,
u.email as user_email,
u.telegram as user_telegram
FROM vacancies v
JOIN users u ON v.user_id = u.id
LEFT JOIN companies c ON v.user_id = c.user_id
WHERE v.id = ?
""", (vacancy_id,))
vacancy = cursor.fetchone()
if not vacancy:
raise HTTPException(status_code=404, detail="Вакансия не найдена")
result = dict(vacancy)
# Добавляем теги
cursor.execute("""
SELECT t.* FROM tags t
JOIN vacancy_tags vt ON t.id = vt.tag_id
WHERE vt.vacancy_id = ?
""", (vacancy_id,))
result["tags"] = [dict(tag) for tag in cursor.fetchall()]
# Проверяем, откликался ли пользователь
if user_id:
cursor.execute("""
SELECT id FROM applications
WHERE vacancy_id = ? AND user_id = ?
""", (vacancy_id, user_id))
result["has_applied"] = cursor.fetchone() is not None
conn.commit()
return result
@app.put("/api/vacancies/{vacancy_id}")
async def update_vacancy(
vacancy_id: int,
vacancy_update: VacancyUpdate,
user_id: int = Depends(get_current_user)
):
"""Обновление существующей вакансии"""
with get_db() as conn:
cursor = conn.cursor()
# Проверяем, что вакансия принадлежит пользователю
cursor.execute("""
SELECT v.*, u.role
FROM vacancies v
JOIN users u ON v.user_id = u.id
WHERE v.id = ? AND v.user_id = ?
""", (vacancy_id, user_id))
vacancy = cursor.fetchone()
if not vacancy:
raise HTTPException(status_code=404, detail="Вакансия не найдена")
# Проверяем роль (только работодатель может редактировать)
if vacancy["role"] != "employer":
raise HTTPException(status_code=403, detail="Только работодатели могут редактировать вакансии")
# Обновляем основные поля
update_fields = []
params = []
if vacancy_update.title is not None:
update_fields.append("title = ?")
params.append(vacancy_update.title)
if vacancy_update.salary is not None:
update_fields.append("salary = ?")
params.append(vacancy_update.salary)
if vacancy_update.description is not None:
update_fields.append("description = ?")
params.append(vacancy_update.description)
if vacancy_update.contact is not None:
update_fields.append("contact = ?")
params.append(vacancy_update.contact)
if vacancy_update.is_active is not None:
update_fields.append("is_active = ?")
params.append(1 if vacancy_update.is_active else 0)
if update_fields:
query = f"UPDATE vacancies SET {', '.join(update_fields)} WHERE id = ?"
params.append(vacancy_id)
cursor.execute(query, params)
# Обновляем теги
if vacancy_update.tags is not None:
# Удаляем старые теги
cursor.execute("DELETE FROM vacancy_tags WHERE vacancy_id = ?", (vacancy_id,))
# Добавляем новые теги
for tag_name in vacancy_update.tags:
# Ищем или создаем тег
cursor.execute("SELECT id FROM tags WHERE name = ?", (tag_name,))
tag = cursor.fetchone()
if tag:
tag_id = tag["id"]
else:
cursor.execute("INSERT INTO tags (name) VALUES (?)", (tag_name,))
tag_id = cursor.lastrowid
cursor.execute("""
INSERT INTO vacancy_tags (vacancy_id, tag_id)
VALUES (?, ?)
""", (vacancy_id, tag_id))
conn.commit()
# Возвращаем обновленную вакансию
cursor.execute("""
SELECT v.*, COALESCE(c.name, u.full_name) as company_name
FROM vacancies v
JOIN users u ON v.user_id = u.id
LEFT JOIN companies c ON v.user_id = c.user_id
WHERE v.id = ?
""", (vacancy_id,))
updated_vacancy = dict(cursor.fetchone())
# Добавляем теги
cursor.execute("""
SELECT t.* FROM tags t
JOIN vacancy_tags vt ON t.id = vt.tag_id
WHERE vt.vacancy_id = ?
""", (vacancy_id,))
updated_vacancy["tags"] = [dict(tag) for tag in cursor.fetchall()]
return updated_vacancy
@app.post("/api/vacancies/{vacancy_id}/apply")
async def apply_to_vacancy(
vacancy_id: int,
user_id: int = Depends(get_current_user)
):
"""Отклик на вакансию"""
with get_db() as conn:
cursor = conn.cursor()
cursor.execute("SELECT id FROM vacancies WHERE id = ?", (vacancy_id,))
if not cursor.fetchone():
raise HTTPException(status_code=404, detail="Вакансия не найдена")
cursor.execute("""
SELECT id FROM applications
WHERE vacancy_id = ? AND user_id = ?
""", (vacancy_id, user_id))
if cursor.fetchone():
raise HTTPException(status_code=400, detail="Вы уже откликались на эту вакансию")
cursor.execute("""
INSERT INTO applications (vacancy_id, user_id)
VALUES (?, ?)
""", (vacancy_id, user_id))
conn.commit()
return {"message": "Отклик отправлен"}
# ========== ЭНДПОИНТЫ ДЛЯ РЕЗЮМЕ ==========
@app.post("/api/resume", response_model=ResumeResponse)
async def create_or_update_resume(
resume: ResumeCreate,
user_id: int = Depends(get_current_user)
):
"""Создание или обновление резюме"""
with get_db() as conn:
cursor = conn.cursor()
# Проверка роли
cursor.execute("SELECT role FROM users WHERE id = ?", (user_id,))
user = cursor.fetchone()
if not user or user["role"] != "employee":
raise HTTPException(status_code=403, detail="Только соискатели могут создавать резюме")
# Проверка существования резюме
cursor.execute("SELECT id FROM resumes WHERE user_id = ?", (user_id,))
existing = cursor.fetchone()
if existing:
# Обновление существующего резюме
cursor.execute("""
UPDATE resumes
SET desired_position = ?, about_me = ?, desired_salary = ?, updated_at = CURRENT_TIMESTAMP
WHERE user_id = ?
""", (resume.desired_position, resume.about_me, resume.desired_salary, user_id))
resume_id = existing["id"]
# Удаление старых записей
cursor.execute("DELETE FROM work_experience WHERE resume_id = ?", (resume_id,))
cursor.execute("DELETE FROM education WHERE resume_id = ?", (resume_id,))
cursor.execute("DELETE FROM resume_tags WHERE resume_id = ?", (resume_id,))
else:
# Создание нового резюме
cursor.execute("""
INSERT INTO resumes (user_id, desired_position, about_me, desired_salary)
VALUES (?, ?, ?, ?)
""", (user_id, resume.desired_position, resume.about_me, resume.desired_salary))
resume_id = cursor.lastrowid
# Добавление опыта работы (ОБНОВЛЕНО - добавили description)
for exp in resume.work_experience:
cursor.execute("""
INSERT INTO work_experience (resume_id, position, company, period, description)
VALUES (?, ?, ?, ?, ?)
""", (resume_id, exp.position, exp.company, exp.period, exp.description))
# Добавление образования
for edu in resume.education:
cursor.execute("""
INSERT INTO education (resume_id, institution, specialty, graduation_year)
VALUES (?, ?, ?, ?)
""", (resume_id, edu.institution, edu.specialty, edu.graduation_year))
# Добавление тегов
if resume.tags:
for tag_name in resume.tags:
cursor.execute("SELECT id FROM tags WHERE name = ?", (tag_name,))
tag = cursor.fetchone()
if tag:
tag_id = tag["id"]
else:
cursor.execute("INSERT INTO tags (name) VALUES (?)", (tag_name,))
tag_id = cursor.lastrowid
cursor.execute("""
INSERT OR IGNORE INTO resume_tags (resume_id, tag_id)
VALUES (?, ?)
""", (resume_id, tag_id))
conn.commit()
# Получение обновленного резюме
cursor.execute("SELECT * FROM resumes WHERE id = ?", (resume_id,))
resume_data = dict(cursor.fetchone())
cursor.execute("""
SELECT position, company, period, description
FROM work_experience
WHERE resume_id = ?
""", (resume_id,))
resume_data["work_experience"] = [dict(exp) for exp in cursor.fetchall()]
cursor.execute("""
SELECT institution, specialty, graduation_year
FROM education
WHERE resume_id = ?
""", (resume_id,))
resume_data["education"] = [dict(edu) for edu in cursor.fetchall()]
cursor.execute("""
SELECT t.* FROM tags t
JOIN resume_tags rt ON t.id = rt.tag_id
WHERE rt.resume_id = ?
""", (resume_id,))
resume_data["tags"] = [dict(tag) for tag in cursor.fetchall()]
return resume_data
@app.get("/api/resume", response_model=ResumeResponse)
async def get_my_resume(user_id: int = Depends(get_current_user)):
"""Получение резюме текущего пользователя"""
with get_db() as conn:
cursor = conn.cursor()
cursor.execute("SELECT * FROM resumes WHERE user_id = ?", (user_id,))
resume = cursor.fetchone()
if not resume:
raise HTTPException(status_code=404, detail="Резюме не найдено")
resume_data = dict(resume)
# Получаем опыт работы (ОБНОВЛЕНО - добавили поле description)
cursor.execute("""
SELECT position, company, period, description
FROM work_experience
WHERE resume_id = ?
ORDER BY
CASE
WHEN period IS NULL THEN 1
ELSE 0
END,
period DESC
""", (resume["id"],))
resume_data["work_experience"] = [dict(exp) for exp in cursor.fetchall()]
# Получаем образование
cursor.execute("""
SELECT institution, specialty, graduation_year
FROM education
WHERE resume_id = ?
ORDER BY graduation_year DESC
""", (resume["id"],))
resume_data["education"] = [dict(edu) for edu in cursor.fetchall()]
# Получаем теги
cursor.execute("""
SELECT t.* FROM tags t
JOIN resume_tags rt ON t.id = rt.tag_id
WHERE rt.resume_id = ?
""", (resume["id"],))
resume_data["tags"] = [dict(tag) for tag in cursor.fetchall()]
return resume_data
@app.get("/api/resumes/all")
async def get_all_resumes(
page: int = 1,
search: str = None,
tags: str = None,
min_salary: int = None
):
"""Получение всех резюме с фильтрацией по тегам"""
with get_db() as conn:
cursor = conn.cursor()
query = """
SELECT DISTINCT r.*, u.full_name
FROM resumes r
JOIN users u ON r.user_id = u.id
WHERE 1=1
"""
params = []
if search:
query += " AND (r.desired_position LIKE ? OR r.about_me LIKE ?)"
params.extend([f"%{search}%", f"%{search}%"])
if tags:
tag_list = [t.strip() for t in tags.split(',')]
placeholders = ','.join(['?'] * len(tag_list))
query += f"""
AND r.id IN (
SELECT rt.resume_id
FROM resume_tags rt
JOIN tags t ON rt.tag_id = t.id
WHERE t.name IN ({placeholders})
GROUP BY rt.resume_id
HAVING COUNT(DISTINCT t.name) = ?
)
"""
params.extend(tag_list + [len(tag_list)])
if min_salary:
query += " AND r.desired_salary LIKE ?"
params.append(f"%{min_salary}%")
# Пагинация
limit = 10
offset = (page - 1) * limit
query += " ORDER BY r.updated_at DESC LIMIT ? OFFSET ?"
params.extend([limit, offset])
cursor.execute(query, params)
resumes = cursor.fetchall()
result = []
for r in resumes:
resume_dict = dict(r)
resume_dict["tags"] = get_tags_for_resume(cursor, r["id"])
cursor.execute("SELECT COUNT(*) FROM work_experience WHERE resume_id = ?", (r["id"],))
resume_dict["experience_count"] = cursor.fetchone()[0]
result.append(resume_dict)
cursor.execute("SELECT COUNT(*) FROM resumes")
total = cursor.fetchone()[0]
return {
"resumes": result,
"total_pages": (total + limit - 1) // limit,
"current_page": page
}
@app.get("/api/resumes/{resume_id}")
async def get_resume(resume_id: int):
"""Получение конкретного резюме"""
with get_db() as conn:
cursor = conn.cursor()
# Увеличиваем счетчик просмотров
cursor.execute("UPDATE resumes SET views = views + 1 WHERE id = ?", (resume_id,))
cursor.execute("""
SELECT r.*, u.full_name, u.email, u.phone, u.telegram
FROM resumes r
JOIN users u ON r.user_id = u.id
WHERE r.id = ?
""", (resume_id,))
resume = cursor.fetchone()
if not resume:
raise HTTPException(status_code=404, detail="Резюме не найдено")
resume_data = dict(resume)
# Получаем опыт работы (ОБНОВЛЕНО - добавили поле description)
cursor.execute("""
SELECT position, company, period, description
FROM work_experience
WHERE resume_id = ?
ORDER BY
CASE
WHEN period IS NULL THEN 1
ELSE 0
END,
period DESC
""", (resume_id,))
resume_data["work_experience"] = [dict(exp) for exp in cursor.fetchall()]
# Получаем образование
cursor.execute("""
SELECT institution, specialty, graduation_year
FROM education
WHERE resume_id = ?
ORDER BY graduation_year DESC
""", (resume_id,))
resume_data["education"] = [dict(edu) for edu in cursor.fetchall()]
# Получаем теги
cursor.execute("""
SELECT t.* FROM tags t
JOIN resume_tags rt ON t.id = rt.tag_id
WHERE rt.resume_id = ?
""", (resume_id,))
resume_data["tags"] = [dict(tag) for tag in cursor.fetchall()]
conn.commit()
return resume_data
@app.get("/api/company", response_model=CompanyResponse)
async def get_my_company(user_id: int = Depends(get_current_user)):
"""Получение информации о компании текущего пользователя"""
with get_db() as conn:
cursor = conn.cursor()
# Проверяем, что пользователь - работодатель
cursor.execute("SELECT role FROM users WHERE id = ?", (user_id,))
user = cursor.fetchone()
if not user or user["role"] != "employer":
raise HTTPException(status_code=403, detail="Только работодатели могут иметь компанию")
cursor.execute("SELECT * FROM companies WHERE user_id = ?", (user_id,))
company = cursor.fetchone()
if not company:
# Если компании нет, возвращаем пустой объект с 404
raise HTTPException(status_code=404, detail="Компания не найдена")
return dict(company)
@app.post("/api/company", response_model=CompanyResponse)
async def create_or_update_company(
company: CompanyCreate,
user_id: int = Depends(get_current_user)
):
"""Создание или обновление информации о компании"""
with get_db() as conn:
cursor = conn.cursor()
# Проверяем, что пользователь - работодатель
cursor.execute("SELECT role FROM users WHERE id = ?", (user_id,))
user = cursor.fetchone()
if not user or user["role"] != "employer":
raise HTTPException(status_code=403, detail="Только работодатели могут создавать компанию")
# Проверяем существование компании
cursor.execute("SELECT id FROM companies WHERE user_id = ?", (user_id,))
existing = cursor.fetchone()
if existing:
# Обновление существующей компании
cursor.execute("""
UPDATE companies
SET name = ?, description = ?, website = ?, logo = ?,
address = ?, phone = ?, email = ?, updated_at = CURRENT_TIMESTAMP
WHERE user_id = ?
""", (company.name, company.description, company.website, company.logo,
company.address, company.phone, company.email, user_id))
company_id = existing["id"]
else:
# Создание новой компании
cursor.execute("""
INSERT INTO companies (user_id, name, description, website, logo, address, phone, email)
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
""", (user_id, company.name, company.description, company.website, company.logo,
company.address, company.phone, company.email))
company_id = cursor.lastrowid
conn.commit()
cursor.execute("SELECT * FROM companies WHERE id = ?", (company_id,))
return dict(cursor.fetchone())
@app.get("/api/companies/{company_id}")
async def get_company_by_id(company_id: int):
"""Получение информации о компании по ID (публичный эндпоинт)"""
with get_db() as conn:
cursor = conn.cursor()
cursor.execute("""
SELECT c.*, u.full_name as owner_name
FROM companies c
JOIN users u ON c.user_id = u.id
WHERE c.id = ?
""", (company_id,))
company = cursor.fetchone()
if not company:
raise HTTPException(status_code=404, detail="Компания не найдена")
return dict(company)
@app.get("/api/resumes/{resume_id}")
async def get_resume(resume_id: int):
"""Получение конкретного резюме"""
with get_db() as conn:
cursor = conn.cursor()
# Увеличиваем счетчик просмотров
cursor.execute("UPDATE resumes SET views = views + 1 WHERE id = ?", (resume_id,))
cursor.execute("""
SELECT r.*, u.full_name, u.email, u.phone, u.telegram
FROM resumes r
JOIN users u ON r.user_id = u.id
WHERE r.id = ?
""", (resume_id,))
resume = cursor.fetchone()
if not resume:
raise HTTPException(status_code=404, detail="Резюме не найдено")
resume_data = dict(resume)
# Получаем опыт работы
cursor.execute("SELECT position, company, period FROM work_experience WHERE resume_id = ?", (resume_id,))
resume_data["work_experience"] = [dict(exp) for exp in cursor.fetchall()]
# Получаем образование
cursor.execute("SELECT institution, specialty, graduation_year FROM education WHERE resume_id = ?",
(resume_id,))
resume_data["education"] = [dict(edu) for edu in cursor.fetchall()]
# Получаем теги
cursor.execute("""
SELECT t.* FROM tags t
JOIN resume_tags rt ON t.id = rt.tag_id
WHERE rt.resume_id = ?
""", (resume_id,))
resume_data["tags"] = [dict(tag) for tag in cursor.fetchall()]
conn.commit()
return resume_data
# ========== ЭНДПОИНТЫ ДЛЯ ИЗБРАННОГО ==========
@app.post("/api/favorites")
async def add_to_favorites(
favorite: FavoriteCreate,
user_id: int = Depends(get_current_user)
):
"""Добавление в избранное"""
with get_db() as conn:
cursor = conn.cursor()
# Проверяем существование элемента
if favorite.item_type == 'vacancy':
cursor.execute("SELECT id FROM vacancies WHERE id = ? AND is_active = 1", (favorite.item_id,))
else: # resume
cursor.execute("SELECT id FROM resumes WHERE id = ?", (favorite.item_id,))
if not cursor.fetchone():
raise HTTPException(status_code=404, detail=f"{favorite.item_type} не найден")
# Добавляем в избранное
try:
cursor.execute("""
INSERT INTO favorites (user_id, item_type, item_id)
VALUES (?, ?, ?)
""", (user_id, favorite.item_type, favorite.item_id))
conn.commit()
return {"message": "Добавлено в избранное"}
except sqlite3.IntegrityError:
raise HTTPException(status_code=400, detail="Уже в избранном")
@app.delete("/api/favorites")
async def remove_from_favorites(
favorite: FavoriteCreate,
user_id: int = Depends(get_current_user)
):
"""Удаление из избранного"""
with get_db() as conn:
cursor = conn.cursor()
cursor.execute("""
DELETE FROM favorites
WHERE user_id = ? AND item_type = ? AND item_id = ?
""", (user_id, favorite.item_type, favorite.item_id))
if cursor.rowcount == 0:
raise HTTPException(status_code=404, detail="Не найдено в избранном")
conn.commit()
return {"message": "Удалено из избранного"}
@app.get("/api/favorites")
async def get_favorites(
user_id: int = Depends(get_current_user),
item_type: Optional[str] = None
):
"""Получение списка избранного"""
with get_db() as conn:
cursor = conn.cursor()
query = "SELECT * FROM favorites WHERE user_id = ?"
params = [user_id]
if item_type:
query += " AND item_type = ?"
params.append(item_type)
query += " ORDER BY created_at DESC"
cursor.execute(query, params)
favorites = cursor.fetchall()
result = []
for fav in favorites:
fav_dict = dict(fav)
# Получаем дополнительные данные о элементе
if fav["item_type"] == 'vacancy':
cursor.execute("""
SELECT v.*, COALESCE(c.name, u.full_name) as company_name
FROM vacancies v
JOIN users u ON v.user_id = u.id
LEFT JOIN companies c ON v.user_id = c.user_id
WHERE v.id = ?
""", (fav["item_id"],))
item = cursor.fetchone()
if item:
item_dict = dict(item)
# Получаем теги вакансии
cursor.execute("""
SELECT t.name FROM tags t
JOIN vacancy_tags vt ON t.id = vt.tag_id
WHERE vt.vacancy_id = ?
""", (fav["item_id"],))
tags = [t["name"] for t in cursor.fetchall()]
fav_dict["item_data"] = {
"id": item["id"],
"title": item["title"],
"company": item_dict.get("company_name"),
"salary": item["salary"],
"tags": tags,
"url": f"/vacancy/{item['id']}"
}
else: # resume
cursor.execute("""
SELECT r.*, u.full_name
FROM resumes r
JOIN users u ON r.user_id = u.id
WHERE r.id = ?
""", (fav["item_id"],))
item = cursor.fetchone()
if item:
item_dict = dict(item)
# Получаем теги резюме
cursor.execute("""
SELECT t.name FROM tags t
JOIN resume_tags rt ON t.id = rt.tag_id
WHERE rt.resume_id = ?
""", (fav["item_id"],))
tags = [t["name"] for t in cursor.fetchall()]
fav_dict["item_data"] = {
"id": item["id"],
"title": item_dict.get("desired_position", "Резюме"),
"name": item["full_name"],
"salary": item["desired_salary"],
"tags": tags,
"url": f"/resume/{item['id']}"
}
result.append(fav_dict)
return result
@app.get("/api/favorites/check/{item_type}/{item_id}")
async def check_favorite(
item_type: str,
item_id: int,
user_id: int = Depends(get_current_user)
):
"""Проверка, находится ли элемент в избранном"""
with get_db() as conn:
cursor = conn.cursor()
cursor.execute("""
SELECT id FROM favorites
WHERE user_id = ? AND item_type = ? AND item_id = ?
""", (user_id, item_type, item_id))
return {"is_favorite": cursor.fetchone() is not None}
@app.get("/api/user/stats")
async def get_user_stats(user_id: int = Depends(get_current_user)):
"""Получение статистики пользователя (количество избранного, просмотров и т.д.)"""
with get_db() as conn:
cursor = conn.cursor()
stats = {}
# Количество избранных вакансий
cursor.execute("""
SELECT COUNT(*) FROM favorites
WHERE user_id = ? AND item_type = 'vacancy'
""", (user_id,))
stats["favorite_vacancies"] = cursor.fetchone()[0]
# Количество избранных резюме
cursor.execute("""
SELECT COUNT(*) FROM favorites
WHERE user_id = ? AND item_type = 'resume'
""", (user_id,))
stats["favorite_resumes"] = cursor.fetchone()[0]
# Общее количество избранного
stats["total_favorites"] = stats["favorite_vacancies"] + stats["favorite_resumes"]
# Количество просмотров профиля (если есть резюме)
cursor.execute("""
SELECT SUM(views) FROM resumes WHERE user_id = ?
""", (user_id,))
stats["profile_views"] = cursor.fetchone()[0] or 0
# Количество созданных вакансий (для работодателя)
cursor.execute("""
SELECT COUNT(*) FROM vacancies
WHERE user_id = ? AND is_active = 1
""", (user_id,))
stats["active_vacancies"] = cursor.fetchone()[0]
return stats
# ========== ЭНДПОИНТЫ ДЛЯ ОТКЛИКОВ ==========
@app.post("/api/applications")
async def create_application(
application: ApplicationCreate,
user_id: int = Depends(get_current_user)
):
"""Создание отклика на вакансию"""
with get_db() as conn:
cursor = conn.cursor()
# Проверяем, что пользователь - соискатель
cursor.execute("SELECT role FROM users WHERE id = ?", (user_id,))
user = cursor.fetchone()
if not user or user["role"] != "employee":
raise HTTPException(status_code=403, detail="Только соискатели могут откликаться на вакансии")
# Проверяем существование вакансии
cursor.execute("""
SELECT v.*, u.id as employer_id
FROM vacancies v
JOIN users u ON v.user_id = u.id
WHERE v.id = ? AND v.is_active = 1
""", (application.vacancy_id,))
vacancy = cursor.fetchone()
if not vacancy:
raise HTTPException(status_code=404, detail="Вакансия не найдена")
# Проверяем, не откликался ли уже
cursor.execute("""
SELECT id FROM applications
WHERE vacancy_id = ? AND user_id = ?
""", (application.vacancy_id, user_id))
if cursor.fetchone():
raise HTTPException(status_code=400, detail="Вы уже откликались на эту вакансию")
# Создаем отклик
cursor.execute("""
INSERT INTO applications (vacancy_id, user_id, message)
VALUES (?, ?, ?)
""", (application.vacancy_id, user_id, application.message))
application_id = cursor.lastrowid
conn.commit()
# Получаем созданный отклик
cursor.execute("SELECT * FROM applications WHERE id = ?", (application_id,))
new_application = dict(cursor.fetchone())
return new_application
@app.get("/api/applications/received")
async def get_received_applications(
user_id: int = Depends(get_current_user),
status: Optional[str] = None,
page: int = 1,
limit: int = 20
):
"""Получение откликов на вакансии работодателя (входящие)"""
try:
with get_db() as conn:
cursor = conn.cursor()
# Проверяем, что пользователь - работодатель
cursor.execute("SELECT role FROM users WHERE id = ?", (user_id,))
user = cursor.fetchone()
if not user or user["role"] != "employer":
raise HTTPException(status_code=403, detail="Только работодатели могут просматривать отклики")
# Исправлено: убрали лишние поля, добавили правильные алиасы
query = """
SELECT
a.*,
v.title as vacancy_title,
v.salary as vacancy_salary,
u.full_name as user_name,
u.email as user_email,
u.phone as user_phone,
u.telegram as user_telegram
FROM applications a
JOIN vacancies v ON a.vacancy_id = v.id
JOIN users u ON a.user_id = u.id
WHERE v.user_id = ?
"""
params = [user_id]
if status:
query += " AND a.status = ?"
params.append(status)
# Пагинация
offset = (page - 1) * limit
query += " ORDER BY a.created_at DESC LIMIT ? OFFSET ?"
params.extend([limit, offset])
cursor.execute(query, params)
applications = cursor.fetchall()
# Получаем общее количество
count_query = "SELECT COUNT(*) FROM applications a JOIN vacancies v ON a.vacancy_id = v.id WHERE v.user_id = ?"
count_params = [user_id]
if status:
count_query += " AND a.status = ?"
count_params.append(status)
cursor.execute(count_query, count_params)
total = cursor.fetchone()[0]
result = []
for app in applications:
app_dict = dict(app)
result.append(app_dict)
return {
"applications": result,
"total": total,
"page": page,
"total_pages": (total + limit - 1) // limit
}
except HTTPException:
raise
except Exception as e:
print(f"Error in get_received_applications: {e}")
raise HTTPException(status_code=500, detail=str(e))
@app.get("/api/applications/stats")
async def get_application_stats(user_id: int = Depends(get_current_user)):
"""Получение статистики по откликам"""
try:
with get_db() as conn:
cursor = conn.cursor()
# Проверяем существование пользователя
cursor.execute("SELECT id, role FROM users WHERE id = ?", (user_id,))
user = cursor.fetchone()
if not user:
raise HTTPException(status_code=404, detail="Пользователь не найден")
# Базовая структура статистики
stats = {
"total": 0,
"pending": 0,
"viewed": 0,
"accepted": 0,
"rejected": 0
}
if user["role"] == "employer":
# Статистика для работодателя (полученные отклики)
cursor.execute("""
SELECT
COUNT(*) as total,
SUM(CASE WHEN a.status = 'pending' THEN 1 ELSE 0 END) as pending,
SUM(CASE WHEN a.status = 'viewed' THEN 1 ELSE 0 END) as viewed,
SUM(CASE WHEN a.status = 'accepted' THEN 1 ELSE 0 END) as accepted,
SUM(CASE WHEN a.status = 'rejected' THEN 1 ELSE 0 END) as rejected
FROM applications a
JOIN vacancies v ON a.vacancy_id = v.id
WHERE v.user_id = ?
""", (user_id,))
row = cursor.fetchone()
if row:
stats = {
"total": row[0] or 0,
"pending": row[1] or 0,
"viewed": row[2] or 0,
"accepted": row[3] or 0,
"rejected": row[4] or 0
}
elif user["role"] == "employee":
# Статистика для соискателя (отправленные отклики)
cursor.execute("""
SELECT
COUNT(*) as total,
SUM(CASE WHEN status = 'pending' THEN 1 ELSE 0 END) as pending,
SUM(CASE WHEN status = 'viewed' THEN 1 ELSE 0 END) as viewed,
SUM(CASE WHEN status = 'accepted' THEN 1 ELSE 0 END) as accepted,
SUM(CASE WHEN status = 'rejected' THEN 1 ELSE 0 END) as rejected
FROM applications
WHERE user_id = ?
""", (user_id,))
row = cursor.fetchone()
if row:
stats = {
"total": row[0] or 0,
"pending": row[1] or 0,
"viewed": row[2] or 0,
"accepted": row[3] or 0,
"rejected": row[4] or 0
}
# Добавляем информацию о роли для отладки
stats["role"] = user["role"]
return stats
except Exception as e:
print(f"❌ Error in get_application_stats: {e}")
raise HTTPException(status_code=500, detail=f"Internal server error: {str(e)}")
@app.get("/api/applications/sent")
async def get_sent_applications(
user_id: int = Depends(get_current_user),
status: Optional[str] = None,
page: int = 1,
limit: int = 20
):
"""Получение отправленных откликов (для соискателя)"""
try:
with get_db() as conn:
cursor = conn.cursor()
# Проверяем, что пользователь - соискатель
cursor.execute("SELECT role FROM users WHERE id = ?", (user_id,))
user = cursor.fetchone()
if not user or user["role"] != "employee":
raise HTTPException(status_code=403, detail="Только соискатели могут просматривать свои отклики")
query = """
SELECT
a.*,
v.title as vacancy_title,
v.salary as vacancy_salary,
COALESCE(c.name, u_emp.full_name) as company_name,
u_emp.email as employer_email,
u_emp.telegram as employer_telegram
FROM applications a
JOIN vacancies v ON a.vacancy_id = v.id
JOIN users u_emp ON v.user_id = u_emp.id
LEFT JOIN companies c ON u_emp.id = c.user_id
WHERE a.user_id = ?
"""
params = [user_id]
if status:
query += " AND a.status = ?"
params.append(status)
# Пагинация
offset = (page - 1) * limit
query += " ORDER BY a.created_at DESC LIMIT ? OFFSET ?"
params.extend([limit, offset])
cursor.execute(query, params)
applications = cursor.fetchall()
# Получаем общее количество
cursor.execute("SELECT COUNT(*) FROM applications WHERE user_id = ?", (user_id,))
total = cursor.fetchone()[0]
result = []
for app in applications:
app_dict = dict(app)
result.append(app_dict)
return {
"applications": result,
"total": total,
"page": page,
"total_pages": (total + limit - 1) // limit
}
except HTTPException:
raise
except Exception as e:
print(f"Error in get_sent_applications: {e}")
raise HTTPException(status_code=500, detail=str(e))
@app.put("/api/applications/{application_id}/status")
async def update_application_status(
application_id: int,
status_update: ApplicationStatusUpdate,
user_id: int = Depends(get_current_user)
):
"""Обновление статуса отклика (для работодателя)"""
with get_db() as conn:
cursor = conn.cursor()
# Проверяем, что отклик существует и пользователь - владелец вакансии
cursor.execute("""
SELECT a.*, v.user_id as employer_id
FROM applications a
JOIN vacancies v ON a.vacancy_id = v.id
WHERE a.id = ?
""", (application_id,))
application = cursor.fetchone()
if not application:
raise HTTPException(status_code=404, detail="Отклик не найден")
if application["employer_id"] != user_id:
raise HTTPException(status_code=403, detail="Только владелец вакансии может изменять статус")
# Обновляем статус
cursor.execute("""
UPDATE applications
SET status = ?, response_message = ?, response_at = CURRENT_TIMESTAMP
WHERE id = ?
""", (status_update.status, status_update.response_message, application_id))
conn.commit()
return {"message": "Статус обновлен"}
@app.get("/api/applications/{application_id}")
async def get_application(
application_id: int,
user_id: int = Depends(get_current_user)
):
"""Получение детальной информации об отклике"""
try:
print(f"📄 Getting application details for ID: {application_id}")
with get_db() as conn:
cursor = conn.cursor()
# Сначала проверим, какие колонки есть в таблице
cursor.execute("PRAGMA table_info(applications)")
columns = [col[1] for col in cursor.fetchall()]
print(f"📊 Доступные колонки: {columns}")
# Динамически строим запрос в зависимости от наличия колонок
select_fields = """
a.*,
v.title as vacancy_title,
v.description as vacancy_description,
v.salary as vacancy_salary,
v.contact as vacancy_contact,
v.user_id as employer_id,
u_applicant.full_name as applicant_name,
u_applicant.email as applicant_email,
u_applicant.phone as applicant_phone,
u_applicant.telegram as applicant_telegram,
u_employer.full_name as employer_name,
u_employer.email as employer_email,
u_employer.telegram as employer_telegram,
c.name as company_name,
c.description as company_description
"""
cursor.execute(f"""
SELECT
{select_fields}
FROM applications a
JOIN vacancies v ON a.vacancy_id = v.id
JOIN users u_applicant ON a.user_id = u_applicant.id
JOIN users u_employer ON v.user_id = u_employer.id
LEFT JOIN companies c ON u_employer.id = c.user_id
WHERE a.id = ?
""", (application_id,))
application = cursor.fetchone()
if not application:
raise HTTPException(status_code=404, detail="Application not found")
app_dict = dict(application)
# Проверка прав доступа
if app_dict["user_id"] != user_id and app_dict["employer_id"] != user_id:
raise HTTPException(status_code=403, detail="Нет доступа к этому отклику")
# Если просматривает работодатель и статус 'pending', меняем на 'viewed'
# Проверяем, существует ли колонка viewed_at
if 'viewed_at' in columns and app_dict["employer_id"] == user_id and app_dict["status"] == 'pending':
cursor.execute("""
UPDATE applications
SET status = 'viewed', viewed_at = CURRENT_TIMESTAMP
WHERE id = ?
""", (application_id,))
conn.commit()
app_dict["status"] = 'viewed'
return app_dict
except HTTPException:
raise
except Exception as e:
print(f"❌ Error in get_application: {e}")
import traceback
traceback.print_exc()
raise HTTPException(status_code=500, detail=f"Internal server error: {str(e)}")
# ========== АДМИНСКИЕ ЭНДПОИНТЫ ==========
@app.get("/api/admin/stats")
async def get_admin_stats(user_id: int = Depends(get_current_user)):
"""Получение статистики для админки"""
with get_db() as conn:
cursor = conn.cursor()
# Проверяем, админ ли пользователь
cursor.execute("SELECT is_admin FROM users WHERE id = ?", (user_id,))
user = cursor.fetchone()
if not user or not user["is_admin"]:
raise HTTPException(status_code=403, detail="Доступ запрещен")
stats = {}
cursor.execute("SELECT COUNT(*) FROM users")
stats["total_users"] = cursor.fetchone()[0]
cursor.execute("SELECT COUNT(*) FROM users WHERE role = 'employee'")
stats["total_employees"] = cursor.fetchone()[0]
cursor.execute("SELECT COUNT(*) FROM users WHERE role = 'employer'")
stats["total_employers"] = cursor.fetchone()[0]
cursor.execute("SELECT COUNT(*) FROM vacancies WHERE is_active = 1")
stats["active_vacancies"] = cursor.fetchone()[0]
cursor.execute("SELECT COUNT(*) FROM resumes")
stats["total_resumes"] = cursor.fetchone()[0]
cursor.execute("SELECT COUNT(*) FROM applications")
stats["total_applications"] = cursor.fetchone()[0]
cursor.execute("SELECT COUNT(*) FROM tags")
stats["total_tags"] = cursor.fetchone()[0]
# Последние регистрации
cursor.execute("""
SELECT id, full_name, email, role, created_at
FROM users
ORDER BY created_at DESC
LIMIT 10
""")
stats["recent_users"] = [dict(u) for u in cursor.fetchall()]
# Популярные теги
cursor.execute("""
SELECT t.name, COUNT(vt.vacancy_id) as vacancy_count, COUNT(rt.resume_id) as resume_count
FROM tags t
LEFT JOIN vacancy_tags vt ON t.id = vt.tag_id
LEFT JOIN resume_tags rt ON t.id = rt.tag_id
GROUP BY t.id
ORDER BY (COUNT(vt.vacancy_id) + COUNT(rt.resume_id)) DESC
LIMIT 20
""")
stats["popular_tags"] = [dict(t) for t in cursor.fetchall()]
return stats
@app.get("/api/admin/users")
async def get_all_users(user_id: int = Depends(get_current_user)):
"""Получение всех пользователей (для админа)"""
with get_db() as conn:
cursor = conn.cursor()
cursor.execute("SELECT is_admin FROM users WHERE id = ?", (user_id,))
user = cursor.fetchone()
if not user or not user["is_admin"]:
raise HTTPException(status_code=403, detail="Доступ запрещен")
cursor.execute("""
SELECT id, full_name, email, phone, telegram, role, is_admin, created_at
FROM users
ORDER BY created_at DESC
""")
users = cursor.fetchall()
return [dict(u) for u in users]
@app.delete("/api/admin/users/{target_user_id}")
async def delete_user(target_user_id: int, user_id: int = Depends(get_current_user)):
"""Удаление пользователя (для админа)"""
with get_db() as conn:
cursor = conn.cursor()
cursor.execute("SELECT is_admin FROM users WHERE id = ?", (user_id,))
user = cursor.fetchone()
if not user or not user["is_admin"]:
raise HTTPException(status_code=403, detail="Доступ запрещен")
cursor.execute("DELETE FROM users WHERE id = ?", (target_user_id,))
conn.commit()
return {"message": "Пользователь удален"}
@app.get("/api/admin/vacancies")
async def get_all_vacancies_admin(user_id: int = Depends(get_current_user)):
"""Получение всех вакансий (для админа)"""
with get_db() as conn:
cursor = conn.cursor()
cursor.execute("SELECT is_admin FROM users WHERE id = ?", (user_id,))
user = cursor.fetchone()
if not user or not user["is_admin"]:
raise HTTPException(status_code=403, detail="Доступ запрещен")
cursor.execute("""
SELECT v.*, u.full_name as company_name
FROM vacancies v
JOIN users u ON v.user_id = u.id
ORDER BY v.created_at DESC
""")
vacancies = cursor.fetchall()
result = []
for v in vacancies:
vacancy_dict = dict(v)
vacancy_dict["tags"] = get_tags_for_vacancy(cursor, v["id"])
result.append(vacancy_dict)
return result
@app.delete("/api/admin/vacancies/{vacancy_id}")
async def delete_vacancy_admin(vacancy_id: int, user_id: int = Depends(get_current_user)):
"""Удаление вакансии (для админа)"""
with get_db() as conn:
cursor = conn.cursor()
cursor.execute("SELECT is_admin FROM users WHERE id = ?", (user_id,))
user = cursor.fetchone()
if not user or not user["is_admin"]:
raise HTTPException(status_code=403, detail="Доступ запрещен")
cursor.execute("DELETE FROM vacancies WHERE id = ?", (vacancy_id,))
conn.commit()
return {"message": "Вакансия удалена"}
# ========== ПОЛЬЗОВАТЕЛЬСКИЕ ЭНДПОИНТЫ ==========
@app.get("/api/user")
async def get_user_info(user_id: int = Depends(get_current_user)):
"""Получение информации о текущем пользователе"""
with get_db() as conn:
cursor = conn.cursor()
cursor.execute("""
SELECT id, full_name, email, phone, telegram, role, is_admin, created_at
FROM users WHERE id = ?
""", (user_id,))
user = cursor.fetchone()
if not user:
raise HTTPException(status_code=404, detail="Пользователь не найден")
return dict(user)
@app.get("/api/users/{target_user_id}/contact")
async def get_user_contact(
target_user_id: int,
current_user: int = Depends(get_current_user)
):
"""Получение контактных данных пользователя"""
with get_db() as conn:
cursor = conn.cursor()
cursor.execute("SELECT email, phone, telegram FROM users WHERE id = ?", (target_user_id,))
user = cursor.fetchone()
if not user:
raise HTTPException(status_code=404, detail="Пользователь не найден")
return dict(user)
@app.get("/api/health")
async def health_check():
"""Проверка работоспособности API"""
return {
"status": "ok",
"time": datetime.now().isoformat(),
"server": "Rabota.Today API"
}
# Улучшенная настройка CORS для мобильных устройств
# Настройка CORS - максимально разрешительная для мобильных
app.add_middleware(
CORSMiddleware,
allow_origins=["*"], # Разрешить всем
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
app.add_middleware(
TrustedHostMiddleware,
allowed_hosts=["yarmarka.rabota.today", "localhost", "127.0.0.1"]
)
# Добавьте middleware для обработки мобильных User-Agent
@app.middleware("http")
async def detect_mobile(request: Request, call_next):
"""Определение мобильных устройств и добавление заголовков"""
user_agent = request.headers.get("user-agent", "").lower()
# Определяем мобильное устройство
is_mobile = any([
"mobile" in user_agent,
"android" in user_agent,
"iphone" in user_agent,
"ipad" in user_agent,
"windows phone" in user_agent
])
# Добавляем информацию о мобильности в request.state
request.state.is_mobile = is_mobile
response = await call_next(request)
# Добавляем заголовки для мобильных устройств
if is_mobile:
response.headers["X-Device-Type"] = "mobile"
response.headers["Cache-Control"] = "no-cache, no-store, must-revalidate"
return response
# Инициализация базы данных
init_db()
set_initial_sequence_ids()
if __name__ == "__main__":
print("🚀 Запуск сервера на http://localhost:8000")
print("📚 Документация API: http://localhost:8000/docs")
print("👤 Админ: admin@rabota.today / admin123")
print("📁 HTML страницы должны быть в папке templates/")
uvicorn.run(app, host="0.0.0.0", port=8000)