import flet as ft
import cv2
import tempfile
import os
import requests
import threading
import time
import base64
def main(page: ft.Page):
page.title = "Azure Custom Vision Image Capture"
page.vertical_alignment = ft.MainAxisAlignment.CENTER
page.horizontal_alignment = ft.CrossAxisAlignment.CENTER
page
.padding
= 0 # Ensure there
is no padding around the edges of the page
page.scroll = ft.ScrollMode.ALWAYS # Allow scrolling for any overflow
title_text = ft.Text("NITTO CORP", size=50, weight=ft.FontWeight.BOLD, text_align=ft.TextAlign.CENTER)
camera_image = ft.Image(width=640, height=480, fit=ft.ImageFit.CONTAIN)
capture_button = ft.ElevatedButton("Capture", bgcolor=ft.colors.BLUE, color=ft.colors.WHITE)
upload_button = ft.FilePicker(on_result=lambda e: upload_image(e))
upload_button_trigger = ft.ElevatedButton("Upload", bgcolor=ft.colors.ORANGE, color=ft.colors.WHITE, on_click=lambda e: upload_button.pick_files(allow_multiple=False))
cancel_button
= ft
.ElevatedButton
("Cancel"
, bgcolor
=ft
.colors
.RED
, color
=ft
.colors
.WHITE
, disabled
=True) send_button
= ft
.ElevatedButton
("Send to Azure"
, bgcolor
=ft
.colors
.GREEN
, color
=ft
.colors
.WHITE
, disabled
=True) result_text = ft.Text("", text_align=ft.TextAlign.CENTER)
temp_image_path = None
cap = None
stop_thread = False
is_uploaded_image = False
def update_camera_image():
nonlocal cap, stop_thread, stream_active, is_uploaded_image
try:
cap = cv2.VideoCapture(0)
if not cap.isOpened():
result_text
.value
= "Error
: Could not
open Camera"
page.update()
return
while not stop_thread and stream_active and not is_uploaded_image:
if ret:
_, imencode_image = cv2.imencode(".jpg", frame)
base64_image = base64.b64encode(imencode_image.tobytes()).decode("utf-8")
camera_image.src_base64 = base64_image
page.update()
time.sleep(0.03)
cap.release()
except Exception as e:
print(f"Camera stream error: {e}")
def capture_image(e):
nonlocal temp_image_path, cap, stop_thread, stream_active, is_uploaded_image
is_uploaded_image = False
time.sleep(1)
try:
if ret:
with tempfile.NamedTemporaryFile(suffix=".jpg", delete=False) as temp_file:
temp_image_path = temp_file.name
cv2.imwrite(temp_image_path, frame)
with
open(temp_image_path
, "rb"
) as image_file
: base64_image
= base64
.b64encode
(image_file
.read()).decode
("utf
-8"
) camera_image.src_base64 = base64_image
send_button.disabled = False
cancel_button.disabled = False
stream_active = False
result_text.value = "Image Captured"
else:
result_text.value = "Error: Could not capture image."
except Exception as ex:
result_text.value = f"An error occurred: {ex}"
page.update()
def upload_image(e):
nonlocal temp_image_path, is_uploaded_image, stream_active
if e.files:
temp_image_path = e.files[0].path
with
open(temp_image_path
, "rb"
) as image_file
: base64_image
= base64
.b64encode
(image_file
.read()).decode
("utf
-8"
) camera_image.src_base64 = base64_image
send_button.disabled = False
cancel_button.disabled = False
result_text.value = "Image Uploaded"
stream_active = False # Stop camera preview
else:
result_text.value = "No file selected."
page.update()
def send_to_azure(e):
nonlocal temp_image_path, stream_active, is_uploaded_image
if not temp_image_path:
result_text.value = "No image captured to send."
page.update()
return
try:
prediction_endpoint = "https://w...content-available-to-author-only...e.com/customvision/v3.0/Prediction/8df0db3e-dd98-4f2a-99f7-a50b767905e6/classify/iterations/Iteration2/image"
prediction_key = "64RvUlO1xbEvmvZwk5cHxmhbRjaN0Qy6A8YEZAzaoNIB9ze4BHsDJQQJ99BCACi0881XJ3w3AAAIACOGFGRQ"
with
open(temp_image_path
, "rb"
) as image_file
: image_data
= image_file
.read() headers = {
"Prediction-Key": prediction_key,
"Content-Type": "application/octet-stream",
}
response = requests.post(prediction_endpoint, headers=headers, data=image_data)
response.raise_for_status()
result_text.value = f"Azure response: {response.json()}"
except requests.exceptions.RequestException as ex:
result_text.value = f"Error Sending to Azure: {ex}"
except Exception as ex:
result_text.value = f"An unexpected error occurred: {ex}"
page.update()
def cancel_capture(e):
nonlocal temp_image_path, stream_active, is_uploaded_image
if temp_image_path and os.path.exists(temp_image_path):
os.remove(temp_image_path)
temp_image_path = None
camera_image.src_base64 = ""
send_button
.disabled
= True cancel_button
.disabled
= True result_text.value = "Capture Cancelled"
is_uploaded_image = False
threading
.Thread
(target
=update_camera_image
, daemon
=True).start
() page.update()
# Set the background image path
background_image_path = "C:/Users/RoboController/Downloads/名称未設定のデザイン (2).jpg"
# Create a container for the background image
background_image = ft.Image(src=background_image_path, fit=ft.ImageFit.COVER)
# Function to update background image size based on page size
def update_background():
background_image.width = page.width
background_image.height = page.height
page.update()
#
Call update_background when the page
is resized
page.on_resize = lambda e: update_background()
# Create a stack to layer the background image and the other controls
stack = ft.Stack(
controls=[
ft.Container(
content=background_image,
width=page.width,
height=page.height,
),
ft.Column([
title_text,
camera_image,
ft.Row([
capture_button,
upload_button_trigger,
cancel_button,
send_button,
], alignment=ft.MainAxisAlignment.CENTER),
result_text,
upload_button,
], horizontal_alignment=ft.CrossAxisAlignment.CENTER)
]
)
page.add(stack)
# Set initial background size
update_background()
threading
.Thread
(target
=update_camera_image
, daemon
=True).start
()
ft.app(target=main)
