106 lines
		
	
	
		
			3.2 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			106 lines
		
	
	
		
			3.2 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| import pathlib
 | |
| from typing import Union, Annotated
 | |
| import uvicorn
 | |
| from routes.user_routes import user_route
 | |
| from bearer.routes.auth import auth
 | |
| from db.DB import DBException
 | |
| import mimetypes
 | |
| from mime.mime import mime_content_type
 | |
| from starlette.middleware import Middleware
 | |
| from starlette.middleware.cors import CORSMiddleware
 | |
| 
 | |
| from fastapi import FastAPI, File, UploadFile, Form, Response, status, HTTPException
 | |
| 
 | |
| import os
 | |
| import magic
 | |
| 
 | |
| middleware = [
 | |
|     Middleware(
 | |
|         CORSMiddleware,
 | |
|         allow_origins=['*'],
 | |
|         allow_credentials=True,
 | |
|         allow_methods=['*'],
 | |
|         allow_headers=['*']
 | |
|     )
 | |
| ]
 | |
| 
 | |
| app = FastAPI(middleware=middleware)
 | |
| 
 | |
| # app.add_middleware(
 | |
| #     CORSMiddleware,
 | |
| #     allow_origins=['*'],
 | |
| #     allow_credentials=True,
 | |
| #     allow_methods=['*'],
 | |
| #     allow_headers=['*'],
 | |
| #     max_age=3600
 | |
| # )
 | |
| 
 | |
| app.include_router(user_route)
 | |
| app.include_router(auth)
 | |
| 
 | |
| 
 | |
| @app.get("/")
 | |
| async def read_root():
 | |
|     return {"Hello": "World"}
 | |
| 
 | |
| 
 | |
| @app.get("/items/{name}/{version}/{file}")
 | |
| async def read_item(version: str, name: str, file: str = 'index.js'):
 | |
|     file_path = f"packages/{name}/{version}/{file}"
 | |
|     file_mime = mime_content_type(file_path)
 | |
|     print(file_mime)
 | |
|     if os.path.exists(file_path):
 | |
|         with open(file_path, 'rb') as f:
 | |
|             return Response(content=f.read(), media_type=file_mime, headers={"Access-Control-Allow-Origin": "*"})
 | |
| 
 | |
|     return status.HTTP_404_NOT_FOUND
 | |
| 
 | |
| 
 | |
| @app.post("/files/")
 | |
| async def create_file(file: Annotated[bytes | None, File()] = None):
 | |
|     if not file:
 | |
|         return {"message": "No file sent"}
 | |
|     else:
 | |
|         return {"file_size": len(file)}
 | |
| 
 | |
| 
 | |
| @app.post("/uploadfile/")
 | |
| async def create_upload_file(file: UploadFile | None, package: Annotated[str, Form()], version: Annotated[str, Form()],
 | |
|                              filename: Annotated[str, Form()], stable: Annotated[bool, Form()]):
 | |
| 
 | |
|     try:
 | |
|         contents = file.file.read()
 | |
|         dir_file = "packages/{package}/{version}".format(package=package, version=version)
 | |
|         dir_last = "packages/{package}/latest".format(package=package)
 | |
|         dir_stable = "packages/{package}/stable".format(package=package)
 | |
|         if not os.path.exists(dir_file):
 | |
|             os.makedirs(dir_file)
 | |
|         if not os.path.exists(dir_last):
 | |
|             os.makedirs(dir_last)
 | |
|         if not os.path.exists(dir_stable):
 | |
|             os.makedirs(dir_stable)
 | |
| 
 | |
|         if os.path.exists(f"{dir_file}/{filename}"):
 | |
|             os.remove(f"{dir_file}/{filename}")
 | |
|         with open(f"{dir_file}/{filename}", 'a') as f:
 | |
|             f.write(str(contents))
 | |
|         if os.path.exists(f"{dir_last}/{filename}"):
 | |
|             os.remove(f"{dir_last}/{filename}")
 | |
|         with open(f"{dir_last}/{filename}", 'a') as f:
 | |
|             f.write(str(contents))
 | |
|         if stable:
 | |
|             if os.path.exists(f"{dir_stable}/{filename}"):
 | |
|                 os.remove(f"{dir_stable}/{filename}")
 | |
|             with open(f"{dir_stable}/{filename}", 'a') as f:
 | |
|                 f.write(str(contents))
 | |
|     except Exception as err:
 | |
|         return {"message": "There was an error uploading the file, error {err}".format(err=err)}
 | |
|     finally:
 | |
|         file.file.close()
 | |
| 
 | |
|     return {"message": f"Successfully uploaded {file.filename}"}
 | |
| 
 | |
| 
 | |
| if __name__ == "__main__":
 | |
|     uvicorn.run("server:app", port=5044, log_level="info")
 |