2025-12-10 15:00:45 +01:00

259 lines
8.3 KiB
Python

from fastapi import FastAPI, Form
from fastapi.responses import HTMLResponse
from fastapi.middleware.cors import CORSMiddleware
from urllib.parse import quote_plus
from playwright.sync_api import sync_playwright, TimeoutError
import json
import requests
app = FastAPI()
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
@app.get("/", response_class=HTMLResponse)
def frontend():
return """
<html>
<body style="font-family: Arial; margin: 40px;">
<h2>🔍 Product Search</h2>
<form action="/search" method="post">
<label>Descrizione:</label><br>
<input name="descrizione" style="width:300px"><br><br>
<label>Marca:</label><br>
<input name="brand" style="width:300px"><br><br>
<label>EAN:</label><br>
<input name="ean" style="width:300px"><br><br>
<button type="submit">Search</button>
</form>
</body>
</html>
"""
def search_fanola(descrizione: str, brand: str):
query = quote_plus(f"{descrizione} {brand}")
search_url = f"https://www.fanola.it/catalogsearch/result/?q={query}"
def extract_table_value(page, label):
try:
el = page.locator(
f"//th[contains(translate(text(),'abcdefghijklmnopqrstuvwxyz','ABCDEFGHIJKLMNOPQRSTUVWXYZ'),"
f"'{label.upper()}')]/following-sibling::td"
).first
return el.inner_text().strip()
except:
return ""
def extract_ingredients(page, description):
selectors = [
".ingredients",
".inci",
"#ingredients",
"div[data-role='content']:has-text('INGREDIENTI')",
"div:has-text('INGREDIENTI')"
]
for sel in selectors:
el = page.query_selector(sel)
if el:
txt = el.inner_text().strip()
if "," in txt and len(txt) > 50:
return txt.replace("INGREDIENTI:", "").strip()
if "INGREDIENTI:" in description.upper():
return description.split("INGREDIENTI:", 1)[-1].strip()
html = page.content()
import re
match = re.search(r"(Aqua\s*\(.*?\).{100,2000})", html, re.I | re.S)
return match.group(1).strip() if match else ""
try:
with sync_playwright() as p:
browser = p.chromium.launch(headless=True)
page = browser.new_page()
page.goto(search_url, wait_until="networkidle", timeout=20000)
page.wait_for_selector("a.product-item-link", timeout=10000)
product_elem = page.query_selector("a.product-item-link")
if not product_elem:
browser.close()
return None
product_url = product_elem.get_attribute("href")
page.goto(product_url, wait_until="networkidle", timeout=20000)
tabs = page.query_selector_all(
".product.data.items .item.title, .tabs .item.title"
)
for t in tabs:
try:
t.click()
page.wait_for_timeout(300)
except:
pass
title_el = page.query_selector("span.base") or page.query_selector("h1.page-title span")
product_title = title_el.inner_text().strip() if title_el else descrizione
desc_el = page.query_selector("div.product.attribute.description")
description = desc_el.inner_text().strip() if desc_el else ""
ingredients = extract_ingredients(page, description)
try:
img_el = page.query_selector("img.fotorama__img")
product_image = img_el.get_attribute("src") if img_el else ""
except:
product_image = ""
sku = extract_table_value(page, "SKU")
barcode = extract_table_value(page, "EAN")
famiglia = extract_table_value(page, "Famiglia")
s_famiglia = extract_table_value(page, "Sottofamiglia")
ss_famiglia = extract_table_value(page, "SSottofamiglia")
gruppo = linea = s_linea = ss_linea = ""
try:
crumbs = page.locator("ul.items li").all_inner_texts()
crumbs = [c.strip() for c in crumbs if c.strip()]
if len(crumbs) > 1:
gruppo = crumbs[1]
if len(crumbs) > 2:
linea = crumbs[2]
if len(crumbs) > 3:
s_linea = crumbs[3]
if len(crumbs) > 4:
ss_linea = crumbs[4]
except:
pass
browser.close()
return {
"url": product_url,
"codice": sku,
"descrizione": product_title,
"marca": brand.title() if brand else "",
"linea": linea,
"s_linea": s_linea,
"ss_linea": ss_linea,
"gruppo": gruppo,
"s_gruppo": "",
"famiglia": famiglia,
"s_famiglia": s_famiglia,
"ss_famiglia": ss_famiglia,
"descrizione_articolo": description,
"ingredienti": ingredients,
"barcode": barcode,
"product_link": product_url,
"image_70x70": product_image
}
except Exception as e:
print("Scraper error:", e)
return None
def clean_newlines(product_data):
for key in ["descrizione_articolo", "ingredienti"]:
if product_data.get(key):
product_data[key] = product_data[key].replace("\n", "<br>")
return product_data
def extract_product_json(scraped):
schema = {
"codice": "",
"descrizione": "",
"marca": "",
"linea": "",
"s_linea": "",
"ss_linea": "",
"gruppo": "",
"s_gruppo": "",
"famiglia": "",
"s_famiglia": "",
"ss_famiglia": "",
"descrizione_articolo": "",
"ingredienti": "",
"barcode": "",
"product_link": "",
"image_70x70": ""
}
prompt = f"""
You are a data normalization system. Use ONLY the scraped data. Do NOT invent anything.
DATI ESTRATTI:
{json.dumps(scraped, indent=2)}
RISPONDI CON SOLO JSON valido che segue esattamente questo schema:
{json.dumps(schema, indent=2)}
"""
try:
res = requests.post(
"http://192.168.2.207:8080/",
model="llama3.1:latest",
messages=[{"role": "user", "content": prompt}]
)
res.raise_for_status()
content = res.json().get("message", {}).get("content", "").strip()
if content.startswith("```"):
content = content.split("```")[1].strip()
start = content.find("{")
end = content.rfind("}") + 1
json_str = content[start:end]
return json.loads(json_str)
except Exception as e:
return {"error": "Invalid JSON", "raw": content, "details": str(e)}
@app.post("/search")
def search_product(descrizione: str = Form(...),
brand: str = Form(...),
ean: str = Form("")):
scraped = search_fanola(descrizione, brand)
if not scraped:
return {
"requested": {"descrizione": descrizione, "brand": brand, "ean": ean},
"product_data": {"error": "Product not found"},
"search_url": f"https://www.fanola.it/catalogsearch/result/?q={quote_plus(descrizione + ' ' + brand)}"
}
product_data = extract_product_json(scraped)
if product_data and "error" not in product_data:
if "descrizione_articolo" in product_data and product_data["descrizione_articolo"]:
product_data["descrizione_articolo"] = product_data["descrizione_articolo"].replace("\n", "<br>")
if "ingredienti" in product_data and product_data["ingredienti"]:
product_data["ingredienti"] = product_data["ingredienti"].replace("\n", "<br>")
return {
"requested": {"descrizione": descrizione, "brand": brand, "ean": ean},
"product_data": product_data,
"search_url": f"https://www.fanola.it/catalogsearch/result/?q={quote_plus(descrizione + ' ' + brand)}"
}