259 lines
8.3 KiB
Python
259 lines
8.3 KiB
Python
from fastapi import FastAPI, Form
|
|
from fastapi.responses import HTMLResponse
|
|
from fastapi.middleware.cors import CORSMiddleware
|
|
from urllib.parse import quote_plus
|
|
from playwright.sync_api import sync_playwright, TimeoutError
|
|
import json
|
|
import requests
|
|
|
|
app = FastAPI()
|
|
app.add_middleware(
|
|
CORSMiddleware,
|
|
allow_origins=["*"],
|
|
allow_credentials=True,
|
|
allow_methods=["*"],
|
|
allow_headers=["*"],
|
|
)
|
|
|
|
@app.get("/", response_class=HTMLResponse)
|
|
def frontend():
|
|
return """
|
|
<html>
|
|
<body style="font-family: Arial; margin: 40px;">
|
|
<h2>🔍 Product Search</h2>
|
|
<form action="/search" method="post">
|
|
<label>Descrizione:</label><br>
|
|
<input name="descrizione" style="width:300px"><br><br>
|
|
<label>Marca:</label><br>
|
|
<input name="brand" style="width:300px"><br><br>
|
|
<label>EAN:</label><br>
|
|
<input name="ean" style="width:300px"><br><br>
|
|
<button type="submit">Search</button>
|
|
</form>
|
|
</body>
|
|
</html>
|
|
"""
|
|
|
|
def search_fanola(descrizione: str, brand: str):
|
|
query = quote_plus(f"{descrizione} {brand}")
|
|
search_url = f"https://www.fanola.it/catalogsearch/result/?q={query}"
|
|
|
|
def extract_table_value(page, label):
|
|
try:
|
|
el = page.locator(
|
|
f"//th[contains(translate(text(),'abcdefghijklmnopqrstuvwxyz','ABCDEFGHIJKLMNOPQRSTUVWXYZ'),"
|
|
f"'{label.upper()}')]/following-sibling::td"
|
|
).first
|
|
return el.inner_text().strip()
|
|
except:
|
|
return ""
|
|
|
|
def extract_ingredients(page, description):
|
|
|
|
selectors = [
|
|
".ingredients",
|
|
".inci",
|
|
"#ingredients",
|
|
"div[data-role='content']:has-text('INGREDIENTI')",
|
|
"div:has-text('INGREDIENTI')"
|
|
]
|
|
|
|
for sel in selectors:
|
|
el = page.query_selector(sel)
|
|
if el:
|
|
txt = el.inner_text().strip()
|
|
if "," in txt and len(txt) > 50:
|
|
return txt.replace("INGREDIENTI:", "").strip()
|
|
|
|
if "INGREDIENTI:" in description.upper():
|
|
return description.split("INGREDIENTI:", 1)[-1].strip()
|
|
|
|
|
|
html = page.content()
|
|
import re
|
|
match = re.search(r"(Aqua\s*\(.*?\).{100,2000})", html, re.I | re.S)
|
|
return match.group(1).strip() if match else ""
|
|
|
|
try:
|
|
with sync_playwright() as p:
|
|
browser = p.chromium.launch(headless=True)
|
|
page = browser.new_page()
|
|
|
|
page.goto(search_url, wait_until="networkidle", timeout=20000)
|
|
page.wait_for_selector("a.product-item-link", timeout=10000)
|
|
|
|
product_elem = page.query_selector("a.product-item-link")
|
|
if not product_elem:
|
|
browser.close()
|
|
return None
|
|
|
|
product_url = product_elem.get_attribute("href")
|
|
page.goto(product_url, wait_until="networkidle", timeout=20000)
|
|
|
|
|
|
tabs = page.query_selector_all(
|
|
".product.data.items .item.title, .tabs .item.title"
|
|
)
|
|
for t in tabs:
|
|
try:
|
|
t.click()
|
|
page.wait_for_timeout(300)
|
|
except:
|
|
pass
|
|
|
|
|
|
title_el = page.query_selector("span.base") or page.query_selector("h1.page-title span")
|
|
product_title = title_el.inner_text().strip() if title_el else descrizione
|
|
|
|
desc_el = page.query_selector("div.product.attribute.description")
|
|
description = desc_el.inner_text().strip() if desc_el else ""
|
|
|
|
|
|
ingredients = extract_ingredients(page, description)
|
|
|
|
|
|
try:
|
|
img_el = page.query_selector("img.fotorama__img")
|
|
product_image = img_el.get_attribute("src") if img_el else ""
|
|
except:
|
|
product_image = ""
|
|
|
|
|
|
sku = extract_table_value(page, "SKU")
|
|
barcode = extract_table_value(page, "EAN")
|
|
famiglia = extract_table_value(page, "Famiglia")
|
|
s_famiglia = extract_table_value(page, "Sottofamiglia")
|
|
ss_famiglia = extract_table_value(page, "SSottofamiglia")
|
|
|
|
|
|
gruppo = linea = s_linea = ss_linea = ""
|
|
try:
|
|
crumbs = page.locator("ul.items li").all_inner_texts()
|
|
crumbs = [c.strip() for c in crumbs if c.strip()]
|
|
if len(crumbs) > 1:
|
|
gruppo = crumbs[1]
|
|
if len(crumbs) > 2:
|
|
linea = crumbs[2]
|
|
if len(crumbs) > 3:
|
|
s_linea = crumbs[3]
|
|
if len(crumbs) > 4:
|
|
ss_linea = crumbs[4]
|
|
except:
|
|
pass
|
|
|
|
browser.close()
|
|
|
|
return {
|
|
"url": product_url,
|
|
"codice": sku,
|
|
"descrizione": product_title,
|
|
"marca": brand.title() if brand else "",
|
|
"linea": linea,
|
|
"s_linea": s_linea,
|
|
"ss_linea": ss_linea,
|
|
"gruppo": gruppo,
|
|
"s_gruppo": "",
|
|
"famiglia": famiglia,
|
|
"s_famiglia": s_famiglia,
|
|
"ss_famiglia": ss_famiglia,
|
|
"descrizione_articolo": description,
|
|
"ingredienti": ingredients,
|
|
"barcode": barcode,
|
|
"product_link": product_url,
|
|
"image_70x70": product_image
|
|
}
|
|
|
|
except Exception as e:
|
|
print("Scraper error:", e)
|
|
return None
|
|
|
|
def clean_newlines(product_data):
|
|
for key in ["descrizione_articolo", "ingredienti"]:
|
|
if product_data.get(key):
|
|
product_data[key] = product_data[key].replace("\n", "<br>")
|
|
return product_data
|
|
|
|
|
|
def extract_product_json(scraped):
|
|
schema = {
|
|
"codice": "",
|
|
"descrizione": "",
|
|
"marca": "",
|
|
"linea": "",
|
|
"s_linea": "",
|
|
"ss_linea": "",
|
|
"gruppo": "",
|
|
"s_gruppo": "",
|
|
"famiglia": "",
|
|
"s_famiglia": "",
|
|
"ss_famiglia": "",
|
|
"descrizione_articolo": "",
|
|
"ingredienti": "",
|
|
"barcode": "",
|
|
"product_link": "",
|
|
"image_70x70": ""
|
|
}
|
|
|
|
prompt = f"""
|
|
You are a data normalization system. Use ONLY the scraped data. Do NOT invent anything.
|
|
|
|
DATI ESTRATTI:
|
|
{json.dumps(scraped, indent=2)}
|
|
|
|
RISPONDI CON SOLO JSON valido che segue esattamente questo schema:
|
|
{json.dumps(schema, indent=2)}
|
|
"""
|
|
|
|
try:
|
|
res = requests.post(
|
|
"http://192.168.2.207:8080/",
|
|
model="llama3.1:latest",
|
|
messages=[{"role": "user", "content": prompt}]
|
|
)
|
|
|
|
res.raise_for_status()
|
|
content = res.json().get("message", {}).get("content", "").strip()
|
|
|
|
|
|
if content.startswith("```"):
|
|
content = content.split("```")[1].strip()
|
|
|
|
|
|
start = content.find("{")
|
|
end = content.rfind("}") + 1
|
|
json_str = content[start:end]
|
|
|
|
return json.loads(json_str)
|
|
|
|
except Exception as e:
|
|
return {"error": "Invalid JSON", "raw": content, "details": str(e)}
|
|
|
|
|
|
@app.post("/search")
|
|
def search_product(descrizione: str = Form(...),
|
|
brand: str = Form(...),
|
|
ean: str = Form("")):
|
|
|
|
scraped = search_fanola(descrizione, brand)
|
|
|
|
if not scraped:
|
|
return {
|
|
"requested": {"descrizione": descrizione, "brand": brand, "ean": ean},
|
|
"product_data": {"error": "Product not found"},
|
|
"search_url": f"https://www.fanola.it/catalogsearch/result/?q={quote_plus(descrizione + ' ' + brand)}"
|
|
}
|
|
|
|
product_data = extract_product_json(scraped)
|
|
|
|
if product_data and "error" not in product_data:
|
|
if "descrizione_articolo" in product_data and product_data["descrizione_articolo"]:
|
|
product_data["descrizione_articolo"] = product_data["descrizione_articolo"].replace("\n", "<br>")
|
|
if "ingredienti" in product_data and product_data["ingredienti"]:
|
|
product_data["ingredienti"] = product_data["ingredienti"].replace("\n", "<br>")
|
|
|
|
return {
|
|
"requested": {"descrizione": descrizione, "brand": brand, "ean": ean},
|
|
"product_data": product_data,
|
|
"search_url": f"https://www.fanola.it/catalogsearch/result/?q={quote_plus(descrizione + ' ' + brand)}"
|
|
}
|