mirror of
https://github.com/Significant-Gravitas/AutoGPT.git
synced 2026-02-08 22:05:08 -05:00
Merge branch 'master' into bently/secrt-881-find-local-businesses-using-google-maps-list-building
This commit is contained in:
@@ -0,0 +1,307 @@
|
||||
import logging
|
||||
import time
|
||||
from enum import Enum
|
||||
from typing import Optional
|
||||
|
||||
import requests
|
||||
from pydantic import Field
|
||||
|
||||
from backend.data.block import Block, BlockCategory, BlockOutput, BlockSchema
|
||||
from backend.data.model import BlockSecret, SchemaField, SecretField
|
||||
|
||||
|
||||
class AudioTrack(str, Enum):
|
||||
OBSERVER = ("Observer",)
|
||||
FUTURISTIC_BEAT = ("Futuristic Beat",)
|
||||
SCIENCE_DOCUMENTARY = ("Science Documentary",)
|
||||
HOTLINE = ("Hotline",)
|
||||
BLADERUNNER_2049 = ("Bladerunner 2049",)
|
||||
A_FUTURE = ("A Future",)
|
||||
ELYSIAN_EMBERS = ("Elysian Embers",)
|
||||
INSPIRING_CINEMATIC = ("Inspiring Cinematic",)
|
||||
BLADERUNNER_REMIX = ("Bladerunner Remix",)
|
||||
IZZAMUZZIC = ("Izzamuzzic",)
|
||||
NAS = ("Nas",)
|
||||
PARIS_ELSE = ("Paris - Else",)
|
||||
SNOWFALL = ("Snowfall",)
|
||||
BURLESQUE = ("Burlesque",)
|
||||
CORNY_CANDY = ("Corny Candy",)
|
||||
HIGHWAY_NOCTURNE = ("Highway Nocturne",)
|
||||
I_DONT_THINK_SO = ("I Don't Think So",)
|
||||
LOSING_YOUR_MARBLES = ("Losing Your Marbles",)
|
||||
REFRESHER = ("Refresher",)
|
||||
TOURIST = ("Tourist",)
|
||||
TWIN_TYCHES = ("Twin Tyches",)
|
||||
|
||||
@property
|
||||
def audio_url(self):
|
||||
audio_urls = {
|
||||
AudioTrack.OBSERVER: "https://cdn.tfrv.xyz/audio/observer.mp3",
|
||||
AudioTrack.FUTURISTIC_BEAT: "https://cdn.tfrv.xyz/audio/_futuristic-beat.mp3",
|
||||
AudioTrack.SCIENCE_DOCUMENTARY: "https://cdn.tfrv.xyz/audio/_science-documentary.mp3",
|
||||
AudioTrack.HOTLINE: "https://cdn.tfrv.xyz/audio/_hotline.mp3",
|
||||
AudioTrack.BLADERUNNER_2049: "https://cdn.tfrv.xyz/audio/_bladerunner-2049.mp3",
|
||||
AudioTrack.A_FUTURE: "https://cdn.tfrv.xyz/audio/a-future.mp3",
|
||||
AudioTrack.ELYSIAN_EMBERS: "https://cdn.tfrv.xyz/audio/elysian-embers.mp3",
|
||||
AudioTrack.INSPIRING_CINEMATIC: "https://cdn.tfrv.xyz/audio/inspiring-cinematic-ambient.mp3",
|
||||
AudioTrack.BLADERUNNER_REMIX: "https://cdn.tfrv.xyz/audio/bladerunner-remix.mp3",
|
||||
AudioTrack.IZZAMUZZIC: "https://cdn.tfrv.xyz/audio/_izzamuzzic.mp3",
|
||||
AudioTrack.NAS: "https://cdn.tfrv.xyz/audio/_nas.mp3",
|
||||
AudioTrack.PARIS_ELSE: "https://cdn.tfrv.xyz/audio/_paris-else.mp3",
|
||||
AudioTrack.SNOWFALL: "https://cdn.tfrv.xyz/audio/_snowfall.mp3",
|
||||
AudioTrack.BURLESQUE: "https://cdn.tfrv.xyz/audio/burlesque.mp3",
|
||||
AudioTrack.CORNY_CANDY: "https://cdn.tfrv.xyz/audio/corny-candy.mp3",
|
||||
AudioTrack.HIGHWAY_NOCTURNE: "https://cdn.tfrv.xyz/audio/highway-nocturne.mp3",
|
||||
AudioTrack.I_DONT_THINK_SO: "https://cdn.tfrv.xyz/audio/i-dont-think-so.mp3",
|
||||
AudioTrack.LOSING_YOUR_MARBLES: "https://cdn.tfrv.xyz/audio/losing-your-marbles.mp3",
|
||||
AudioTrack.REFRESHER: "https://cdn.tfrv.xyz/audio/refresher.mp3",
|
||||
AudioTrack.TOURIST: "https://cdn.tfrv.xyz/audio/tourist.mp3",
|
||||
AudioTrack.TWIN_TYCHES: "https://cdn.tfrv.xyz/audio/twin-tynches.mp3",
|
||||
}
|
||||
return audio_urls[self]
|
||||
|
||||
|
||||
class GenerationPreset(str, Enum):
|
||||
LEONARDO = ("Default",)
|
||||
ANIME = ("Anime",)
|
||||
REALISM = ("Realist",)
|
||||
ILLUSTRATION = ("Illustration",)
|
||||
SKETCH_COLOR = ("Sketch Color",)
|
||||
SKETCH_BW = ("Sketch B&W",)
|
||||
PIXAR = ("Pixar",)
|
||||
INK = ("Japanese Ink",)
|
||||
RENDER_3D = ("3D Render",)
|
||||
LEGO = ("Lego",)
|
||||
SCIFI = ("Sci-Fi",)
|
||||
RECRO_CARTOON = ("Retro Cartoon",)
|
||||
PIXEL_ART = ("Pixel Art",)
|
||||
CREATIVE = ("Creative",)
|
||||
PHOTOGRAPHY = ("Photography",)
|
||||
RAYTRACED = ("Raytraced",)
|
||||
ENVIRONMENT = ("Environment",)
|
||||
FANTASY = ("Fantasy",)
|
||||
ANIME_SR = ("Anime Realism",)
|
||||
MOVIE = ("Movie",)
|
||||
STYLIZED_ILLUSTRATION = ("Stylized Illustration",)
|
||||
MANGA = ("Manga",)
|
||||
|
||||
|
||||
class Voice(str, Enum):
|
||||
LILY = "Lily"
|
||||
DANIEL = "Daniel"
|
||||
BRIAN = "Brian"
|
||||
JESSICA = "Jessica"
|
||||
CHARLOTTE = "Charlotte"
|
||||
CALLUM = "Callum"
|
||||
|
||||
@property
|
||||
def voice_id(self):
|
||||
voice_id_map = {
|
||||
Voice.LILY: "pFZP5JQG7iQjIQuC4Bku",
|
||||
Voice.DANIEL: "onwK4e9ZLuTAKqWW03F9",
|
||||
Voice.BRIAN: "nPczCjzI2devNBz1zQrb",
|
||||
Voice.JESSICA: "cgSgspJ2msm6clMCkdW9",
|
||||
Voice.CHARLOTTE: "XB0fDUnXU5powFXDhCwa",
|
||||
Voice.CALLUM: "N2lVS1w4EtoT3dr4eOWO",
|
||||
}
|
||||
return voice_id_map[self]
|
||||
|
||||
def __str__(self):
|
||||
return self.value
|
||||
|
||||
|
||||
class VisualMediaType(str, Enum):
|
||||
STOCK_VIDEOS = ("stockVideo",)
|
||||
MOVING_AI_IMAGES = ("movingImage",)
|
||||
AI_VIDEO = ("aiVideo",)
|
||||
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class AIShortformVideoCreatorBlock(Block):
|
||||
class Input(BlockSchema):
|
||||
api_key: BlockSecret = SecretField(
|
||||
key="revid_api_key",
|
||||
description="Your revid.ai API key",
|
||||
placeholder="Enter your revid.ai API key",
|
||||
)
|
||||
script: str = SchemaField(
|
||||
description="""1. Use short and punctuated sentences\n\n2. Use linebreaks to create a new clip\n\n3. Text outside of brackets is spoken by the AI, and [text between brackets] will be used to guide the visual generation. For example, [close-up of a cat] will show a close-up of a cat.""",
|
||||
placeholder="[close-up of a cat] Meow!",
|
||||
)
|
||||
ratio: str = Field(description="Aspect ratio of the video", default="9 / 16")
|
||||
resolution: str = Field(description="Resolution of the video", default="720p")
|
||||
frame_rate: int = Field(description="Frame rate of the video", default=60)
|
||||
generation_preset: GenerationPreset = SchemaField(
|
||||
description="Generation preset for visual style - only effects AI generated visuals",
|
||||
default=GenerationPreset.LEONARDO,
|
||||
placeholder=GenerationPreset.LEONARDO,
|
||||
)
|
||||
background_music: AudioTrack = SchemaField(
|
||||
description="Background music track",
|
||||
default=AudioTrack.HIGHWAY_NOCTURNE,
|
||||
placeholder=AudioTrack.HIGHWAY_NOCTURNE,
|
||||
)
|
||||
voice: Voice = SchemaField(
|
||||
description="AI voice to use for narration",
|
||||
default=Voice.LILY,
|
||||
placeholder=Voice.LILY,
|
||||
)
|
||||
video_style: VisualMediaType = SchemaField(
|
||||
description="Type of visual media to use for the video",
|
||||
default=VisualMediaType.STOCK_VIDEOS,
|
||||
placeholder=VisualMediaType.STOCK_VIDEOS,
|
||||
)
|
||||
|
||||
class Output(BlockSchema):
|
||||
video_url: str = Field(description="The URL of the created video")
|
||||
error: Optional[str] = Field(description="Error message if the request failed")
|
||||
|
||||
def __init__(self):
|
||||
super().__init__(
|
||||
id="361697fb-0c4f-4feb-aed3-8320c88c771b",
|
||||
description="Creates a shortform video using revid.ai",
|
||||
categories={BlockCategory.SOCIAL, BlockCategory.AI},
|
||||
input_schema=AIShortformVideoCreatorBlock.Input,
|
||||
output_schema=AIShortformVideoCreatorBlock.Output,
|
||||
test_input={
|
||||
"api_key": "test_api_key",
|
||||
"script": "[close-up of a cat] Meow!",
|
||||
"ratio": "9 / 16",
|
||||
"resolution": "720p",
|
||||
"frame_rate": 60,
|
||||
"generation_preset": GenerationPreset.LEONARDO,
|
||||
"background_music": AudioTrack.HIGHWAY_NOCTURNE,
|
||||
"voice": Voice.LILY,
|
||||
"video_style": VisualMediaType.STOCK_VIDEOS,
|
||||
},
|
||||
test_output=(
|
||||
"video_url",
|
||||
"https://example.com/video.mp4",
|
||||
),
|
||||
test_mock={
|
||||
"create_webhook": lambda: (
|
||||
"test_uuid",
|
||||
"https://webhook.site/test_uuid",
|
||||
),
|
||||
"create_video": lambda api_key, payload: {"pid": "test_pid"},
|
||||
"wait_for_video": lambda api_key, pid, webhook_token, max_wait_time=1000: "https://example.com/video.mp4",
|
||||
},
|
||||
)
|
||||
|
||||
def create_webhook(self):
|
||||
url = "https://webhook.site/token"
|
||||
headers = {"Accept": "application/json", "Content-Type": "application/json"}
|
||||
response = requests.post(url, headers=headers)
|
||||
response.raise_for_status()
|
||||
webhook_data = response.json()
|
||||
return webhook_data["uuid"], f"https://webhook.site/{webhook_data['uuid']}"
|
||||
|
||||
def create_video(self, api_key: str, payload: dict) -> dict:
|
||||
url = "https://www.revid.ai/api/public/v2/render"
|
||||
headers = {"key": api_key}
|
||||
response = requests.post(url, json=payload, headers=headers)
|
||||
logger.debug(
|
||||
f"API Response Status Code: {response.status_code}, Content: {response.text}"
|
||||
)
|
||||
response.raise_for_status()
|
||||
return response.json()
|
||||
|
||||
def check_video_status(self, api_key: str, pid: str) -> dict:
|
||||
url = f"https://www.revid.ai/api/public/v2/status?pid={pid}"
|
||||
headers = {"key": api_key}
|
||||
response = requests.get(url, headers=headers)
|
||||
response.raise_for_status()
|
||||
return response.json()
|
||||
|
||||
def wait_for_video(
|
||||
self, api_key: str, pid: str, webhook_token: str, max_wait_time: int = 1000
|
||||
) -> str:
|
||||
start_time = time.time()
|
||||
while time.time() - start_time < max_wait_time:
|
||||
status = self.check_video_status(api_key, pid)
|
||||
logger.debug(f"Video status: {status}")
|
||||
|
||||
if status.get("status") == "ready" and "videoUrl" in status:
|
||||
return status["videoUrl"]
|
||||
elif status.get("status") == "error":
|
||||
error_message = status.get("error", "Unknown error occurred")
|
||||
logger.error(f"Video creation failed: {error_message}")
|
||||
raise ValueError(f"Video creation failed: {error_message}")
|
||||
elif status.get("status") in ["FAILED", "CANCELED"]:
|
||||
logger.error(f"Video creation failed: {status.get('message')}")
|
||||
raise ValueError(f"Video creation failed: {status.get('message')}")
|
||||
|
||||
time.sleep(10)
|
||||
|
||||
logger.error("Video creation timed out")
|
||||
raise TimeoutError("Video creation timed out")
|
||||
|
||||
def run(self, input_data: Input, **kwargs) -> BlockOutput:
|
||||
try:
|
||||
# Create a new Webhook.site URL
|
||||
webhook_token, webhook_url = self.create_webhook()
|
||||
logger.debug(f"Webhook URL: {webhook_url}")
|
||||
|
||||
audio_url = input_data.background_music.audio_url
|
||||
|
||||
payload = {
|
||||
"frameRate": input_data.frame_rate,
|
||||
"resolution": input_data.resolution,
|
||||
"frameDurationMultiplier": 18,
|
||||
"webhook": webhook_url,
|
||||
"creationParams": {
|
||||
"mediaType": input_data.video_style,
|
||||
"captionPresetName": "Wrap 1",
|
||||
"selectedVoice": input_data.voice.voice_id,
|
||||
"hasEnhancedGeneration": True,
|
||||
"generationPreset": input_data.generation_preset.name,
|
||||
"selectedAudio": input_data.background_music,
|
||||
"origin": "/create",
|
||||
"inputText": input_data.script,
|
||||
"flowType": "text-to-video",
|
||||
"slug": "create-tiktok-video",
|
||||
"hasToGenerateVoice": True,
|
||||
"hasToTranscript": False,
|
||||
"hasToSearchMedia": True,
|
||||
"hasAvatar": False,
|
||||
"hasWebsiteRecorder": False,
|
||||
"hasTextSmallAtBottom": False,
|
||||
"ratio": input_data.ratio,
|
||||
"sourceType": "contentScraping",
|
||||
"selectedStoryStyle": {"value": "custom", "label": "Custom"},
|
||||
"hasToGenerateVideos": input_data.video_style
|
||||
!= VisualMediaType.STOCK_VIDEOS,
|
||||
"audioUrl": audio_url,
|
||||
},
|
||||
}
|
||||
|
||||
logger.debug("Creating video...")
|
||||
response = self.create_video(input_data.api_key.get_secret_value(), payload)
|
||||
pid = response.get("pid")
|
||||
|
||||
if not pid:
|
||||
logger.error(
|
||||
f"Failed to create video: No project ID returned. API Response: {response}"
|
||||
)
|
||||
yield "error", "Failed to create video: No project ID returned"
|
||||
else:
|
||||
logger.debug(
|
||||
f"Video created with project ID: {pid}. Waiting for completion..."
|
||||
)
|
||||
video_url = self.wait_for_video(
|
||||
input_data.api_key.get_secret_value(), pid, webhook_token
|
||||
)
|
||||
logger.debug(f"Video ready: {video_url}")
|
||||
yield "video_url", video_url
|
||||
|
||||
except requests.RequestException as e:
|
||||
logger.exception("Error creating video")
|
||||
yield "error", f"Error creating video: {str(e)}"
|
||||
except ValueError as e:
|
||||
logger.exception("Error in video creation process")
|
||||
yield "error", str(e)
|
||||
except TimeoutError as e:
|
||||
logger.exception("Video creation timed out")
|
||||
yield "error", str(e)
|
||||
@@ -1,37 +1,52 @@
|
||||
from typing import Any, List, Tuple
|
||||
from typing import Any
|
||||
|
||||
from backend.data.block import Block, BlockCategory, BlockOutput, BlockSchema
|
||||
from backend.data.model import SchemaField
|
||||
|
||||
|
||||
class ListIteratorBlock(Block):
|
||||
class StepThroughItemsBlock(Block):
|
||||
class Input(BlockSchema):
|
||||
items: List[Any] = SchemaField(
|
||||
description="The list of items to iterate over",
|
||||
placeholder="[1, 2, 3, 4, 5]",
|
||||
items: list | dict = SchemaField(
|
||||
description="The list or dictionary of items to iterate over",
|
||||
placeholder="[1, 2, 3, 4, 5] or {'key1': 'value1', 'key2': 'value2'}",
|
||||
)
|
||||
|
||||
class Output(BlockSchema):
|
||||
item: Tuple[int, Any] = SchemaField(
|
||||
description="A tuple with the index and current item in the iteration"
|
||||
item: Any = SchemaField(description="The current item in the iteration")
|
||||
key: Any = SchemaField(
|
||||
description="The key or index of the current item in the iteration",
|
||||
)
|
||||
|
||||
def __init__(self):
|
||||
super().__init__(
|
||||
id="f8e7d6c5-b4a3-2c1d-0e9f-8g7h6i5j4k3l",
|
||||
input_schema=ListIteratorBlock.Input,
|
||||
output_schema=ListIteratorBlock.Output,
|
||||
description="Iterates over a list of items and outputs each item with its index.",
|
||||
input_schema=StepThroughItemsBlock.Input,
|
||||
output_schema=StepThroughItemsBlock.Output,
|
||||
categories={BlockCategory.LOGIC},
|
||||
test_input={"items": [1, "two", {"three": 3}, [4, 5]]},
|
||||
description="Iterates over a list or dictionary and outputs each item.",
|
||||
test_input={"items": [1, 2, 3, {"key1": "value1", "key2": "value2"}]},
|
||||
test_output=[
|
||||
("item", (0, 1)),
|
||||
("item", (1, "two")),
|
||||
("item", (2, {"three": 3})),
|
||||
("item", (3, [4, 5])),
|
||||
("item", 1),
|
||||
("key", 0),
|
||||
("item", 2),
|
||||
("key", 1),
|
||||
("item", 3),
|
||||
("key", 2),
|
||||
("item", {"key1": "value1", "key2": "value2"}),
|
||||
("key", 3),
|
||||
],
|
||||
test_mock={},
|
||||
)
|
||||
|
||||
def run(self, input_data: Input, **kwargs) -> BlockOutput:
|
||||
for index, item in enumerate(input_data.items):
|
||||
yield "item", (index, item)
|
||||
items = input_data.items
|
||||
if isinstance(items, dict):
|
||||
# If items is a dictionary, iterate over its values
|
||||
for item in items.values():
|
||||
yield "item", item
|
||||
yield "key", item
|
||||
else:
|
||||
# If items is a list, iterate over the list
|
||||
for index, item in enumerate(items):
|
||||
yield "item", item
|
||||
yield "key", index
|
||||
|
||||
@@ -1,3 +1,4 @@
|
||||
import ast
|
||||
import logging
|
||||
from enum import Enum
|
||||
from json import JSONDecodeError
|
||||
@@ -209,6 +210,7 @@ class AIStructuredResponseGeneratorBlock(Block):
|
||||
raise ValueError(f"Unsupported LLM provider: {provider}")
|
||||
|
||||
def run(self, input_data: Input, **kwargs) -> BlockOutput:
|
||||
logger.debug(f"Calling LLM with input data: {input_data}")
|
||||
prompt = []
|
||||
|
||||
def trim_prompt(s: str) -> str:
|
||||
@@ -622,3 +624,232 @@ class AIConversationBlock(Block):
|
||||
yield "response", response
|
||||
except Exception as e:
|
||||
yield "error", f"Error calling LLM: {str(e)}"
|
||||
|
||||
|
||||
class AIListGeneratorBlock(Block):
|
||||
class Input(BlockSchema):
|
||||
focus: str | None = SchemaField(
|
||||
description="The focus of the list to generate.",
|
||||
placeholder="The top 5 most interesting news stories in the data.",
|
||||
default=None,
|
||||
advanced=False,
|
||||
)
|
||||
source_data: str | None = SchemaField(
|
||||
description="The data to generate the list from.",
|
||||
placeholder="News Today: Humans land on Mars: Today humans landed on mars. -- AI wins Nobel Prize: AI wins Nobel Prize for solving world hunger. -- New AI Model: A new AI model has been released.",
|
||||
default=None,
|
||||
advanced=False,
|
||||
)
|
||||
model: LlmModel = SchemaField(
|
||||
title="LLM Model",
|
||||
default=LlmModel.GPT4_TURBO,
|
||||
description="The language model to use for generating the list.",
|
||||
advanced=True,
|
||||
)
|
||||
api_key: BlockSecret = SecretField(value="")
|
||||
max_retries: int = SchemaField(
|
||||
default=3,
|
||||
description="Maximum number of retries for generating a valid list.",
|
||||
ge=1,
|
||||
le=5,
|
||||
)
|
||||
|
||||
class Output(BlockSchema):
|
||||
generated_list: List[str] = SchemaField(description="The generated list.")
|
||||
list_item: str = SchemaField(
|
||||
description="Each individual item in the list.",
|
||||
)
|
||||
error: str = SchemaField(
|
||||
description="Error message if the list generation failed."
|
||||
)
|
||||
|
||||
def __init__(self):
|
||||
super().__init__(
|
||||
id="9c0b0450-d199-458b-a731-072189dd6593",
|
||||
description="Generate a Python list based on the given prompt using a Large Language Model (LLM).",
|
||||
categories={BlockCategory.AI, BlockCategory.TEXT},
|
||||
input_schema=AIListGeneratorBlock.Input,
|
||||
output_schema=AIListGeneratorBlock.Output,
|
||||
test_input={
|
||||
"focus": "planets",
|
||||
"source_data": (
|
||||
"Zylora Prime is a glowing jungle world with bioluminescent plants, "
|
||||
"while Kharon-9 is a harsh desert planet with underground cities. "
|
||||
"Vortexia's constant storms power floating cities, and Oceara is a water-covered world home to "
|
||||
"intelligent marine life. On icy Draknos, ancient ruins lie buried beneath its frozen landscape, "
|
||||
"drawing explorers to uncover its mysteries. Each planet showcases the limitless possibilities of "
|
||||
"fictional worlds."
|
||||
),
|
||||
"model": LlmModel.GPT4_TURBO,
|
||||
"api_key": "test_api_key",
|
||||
"max_retries": 3,
|
||||
},
|
||||
test_output=[
|
||||
(
|
||||
"generated_list",
|
||||
["Zylora Prime", "Kharon-9", "Vortexia", "Oceara", "Draknos"],
|
||||
),
|
||||
("list_item", "Zylora Prime"),
|
||||
("list_item", "Kharon-9"),
|
||||
("list_item", "Vortexia"),
|
||||
("list_item", "Oceara"),
|
||||
("list_item", "Draknos"),
|
||||
],
|
||||
test_mock={
|
||||
"llm_call": lambda input_data: {
|
||||
"response": "['Zylora Prime', 'Kharon-9', 'Vortexia', 'Oceara', 'Draknos']"
|
||||
},
|
||||
},
|
||||
)
|
||||
|
||||
@staticmethod
|
||||
def llm_call(
|
||||
input_data: AIStructuredResponseGeneratorBlock.Input,
|
||||
) -> dict[str, str]:
|
||||
llm_block = AIStructuredResponseGeneratorBlock()
|
||||
for output_name, output_data in llm_block.run(input_data):
|
||||
if output_name == "response":
|
||||
logger.debug(f"Received response from LLM: {output_data}")
|
||||
return output_data
|
||||
raise ValueError("Failed to get a response from the LLM.")
|
||||
|
||||
@staticmethod
|
||||
def string_to_list(string):
|
||||
"""
|
||||
Converts a string representation of a list into an actual Python list object.
|
||||
"""
|
||||
logger.debug(f"Converting string to list. Input string: {string}")
|
||||
try:
|
||||
# Use ast.literal_eval to safely evaluate the string
|
||||
python_list = ast.literal_eval(string)
|
||||
if isinstance(python_list, list):
|
||||
logger.debug(f"Successfully converted string to list: {python_list}")
|
||||
return python_list
|
||||
else:
|
||||
logger.error(f"The provided string '{string}' is not a valid list")
|
||||
raise ValueError(f"The provided string '{string}' is not a valid list.")
|
||||
except (SyntaxError, ValueError) as e:
|
||||
logger.error(f"Failed to convert string to list: {e}")
|
||||
raise ValueError("Invalid list format. Could not convert to list.")
|
||||
|
||||
def run(self, input_data: Input, **kwargs) -> BlockOutput:
|
||||
logger.debug(f"Starting AIListGeneratorBlock.run with input data: {input_data}")
|
||||
|
||||
# Check for API key
|
||||
api_key_check = (
|
||||
input_data.api_key.get_secret_value()
|
||||
or LlmApiKeys[input_data.model.metadata.provider].get_secret_value()
|
||||
)
|
||||
if not api_key_check:
|
||||
logger.error("No LLM API key provided.")
|
||||
yield "error", "No LLM API key provided."
|
||||
return
|
||||
|
||||
# Prepare the system prompt
|
||||
sys_prompt = """You are a Python list generator. Your task is to generate a Python list based on the user's prompt.
|
||||
|Respond ONLY with a valid python list.
|
||||
|The list can contain strings, numbers, or nested lists as appropriate.
|
||||
|Do not include any explanations or additional text.
|
||||
|
||||
|Valid Example string formats:
|
||||
|
||||
|Example 1:
|
||||
|```
|
||||
|['1', '2', '3', '4']
|
||||
|```
|
||||
|
||||
|Example 2:
|
||||
|```
|
||||
|[['1', '2'], ['3', '4'], ['5', '6']]
|
||||
|```
|
||||
|
||||
|Example 3:
|
||||
|```
|
||||
|['1', ['2', '3'], ['4', ['5', '6']]]
|
||||
|```
|
||||
|
||||
|Example 4:
|
||||
|```
|
||||
|['a', 'b', 'c']
|
||||
|```
|
||||
|
||||
|Example 5:
|
||||
|```
|
||||
|['1', '2.5', 'string', 'True', ['False', 'None']]
|
||||
|```
|
||||
|
||||
|Do not include any explanations or additional text, just respond with the list in the format specified above.
|
||||
"""
|
||||
# If a focus is provided, add it to the prompt
|
||||
if input_data.focus:
|
||||
prompt = f"Generate a list with the following focus:\n<focus>\n\n{input_data.focus}</focus>"
|
||||
else:
|
||||
# If there's source data
|
||||
if input_data.source_data:
|
||||
prompt = "Extract the main focus of the source data to a list.\ni.e if the source data is a news website, the focus would be the news stories rather than the social links in the footer."
|
||||
else:
|
||||
# No focus or source data provided, generat a random list
|
||||
prompt = "Generate a random list."
|
||||
|
||||
# If the source data is provided, add it to the prompt
|
||||
if input_data.source_data:
|
||||
prompt += f"\n\nUse the following source data to generate the list from:\n\n<source_data>\n\n{input_data.source_data}</source_data>\n\nDo not invent fictional data that is not present in the source data."
|
||||
# Else, tell the LLM to synthesize the data
|
||||
else:
|
||||
prompt += "\n\nInvent the data to generate the list from."
|
||||
|
||||
for attempt in range(input_data.max_retries):
|
||||
try:
|
||||
logger.debug("Calling LLM")
|
||||
llm_response = self.llm_call(
|
||||
AIStructuredResponseGeneratorBlock.Input(
|
||||
sys_prompt=sys_prompt,
|
||||
prompt=prompt,
|
||||
api_key=input_data.api_key,
|
||||
model=input_data.model,
|
||||
expected_format={}, # Do not use structured response
|
||||
)
|
||||
)
|
||||
|
||||
logger.debug(f"LLM response: {llm_response}")
|
||||
|
||||
# Extract Response string
|
||||
response_string = llm_response["response"]
|
||||
logger.debug(f"Response string: {response_string}")
|
||||
|
||||
# Convert the string to a Python list
|
||||
logger.debug("Converting string to Python list")
|
||||
parsed_list = self.string_to_list(response_string)
|
||||
logger.debug(f"Parsed list: {parsed_list}")
|
||||
|
||||
# If we reach here, we have a valid Python list
|
||||
logger.debug("Successfully generated a valid Python list")
|
||||
yield "generated_list", parsed_list
|
||||
|
||||
# Yield each item in the list
|
||||
for item in parsed_list:
|
||||
yield "list_item", item
|
||||
return
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error in attempt {attempt + 1}: {str(e)}")
|
||||
if attempt == input_data.max_retries - 1:
|
||||
logger.error(
|
||||
f"Failed to generate a valid Python list after {input_data.max_retries} attempts"
|
||||
)
|
||||
yield "error", f"Failed to generate a valid Python list after {input_data.max_retries} attempts. Last error: {str(e)}"
|
||||
else:
|
||||
# Add a retry prompt
|
||||
logger.debug("Preparing retry prompt")
|
||||
prompt = f"""
|
||||
The previous attempt failed due to `{e}`
|
||||
Generate a valid Python list based on the original prompt.
|
||||
Remember to respond ONLY with a valid Python list as per the format specified earlier.
|
||||
Original prompt:
|
||||
```{prompt}```
|
||||
|
||||
Respond only with the list in the format specified with no commentary or apologies.
|
||||
"""
|
||||
logger.debug(f"Retry prompt: {prompt}")
|
||||
|
||||
logger.debug("AIListGeneratorBlock.run completed")
|
||||
|
||||
@@ -4,7 +4,7 @@ from urllib.parse import quote
|
||||
import requests
|
||||
|
||||
from backend.data.block import Block, BlockCategory, BlockOutput, BlockSchema
|
||||
from backend.data.model import BlockSecret, SecretField
|
||||
from backend.data.model import BlockSecret, SchemaField, SecretField
|
||||
|
||||
|
||||
class GetRequest:
|
||||
@@ -96,6 +96,12 @@ class SearchTheWebBlock(Block, GetRequest):
|
||||
class ExtractWebsiteContentBlock(Block, GetRequest):
|
||||
class Input(BlockSchema):
|
||||
url: str # The URL to scrape
|
||||
raw_content: bool = SchemaField(
|
||||
default=False,
|
||||
title="Raw Content",
|
||||
description="Whether to do a raw scrape of the content or use Jina-ai Reader to scrape the content",
|
||||
advanced=True,
|
||||
)
|
||||
|
||||
class Output(BlockSchema):
|
||||
content: str # The scraped content from the URL
|
||||
@@ -114,21 +120,18 @@ class ExtractWebsiteContentBlock(Block, GetRequest):
|
||||
)
|
||||
|
||||
def run(self, input_data: Input, **kwargs) -> BlockOutput:
|
||||
if input_data.raw_content:
|
||||
url = input_data.url
|
||||
else:
|
||||
url = f"https://r.jina.ai/{input_data.url}"
|
||||
|
||||
try:
|
||||
# Prepend the Jina-ai Reader URL to the input URL
|
||||
jina_url = f"https://r.jina.ai/{input_data.url}"
|
||||
|
||||
# Make the request to Jina-ai Reader
|
||||
response = self.get_request(jina_url, json=False)
|
||||
|
||||
# Output the scraped content
|
||||
yield "content", response
|
||||
|
||||
content = self.get_request(url, json=False)
|
||||
yield "content", content
|
||||
except requests.exceptions.HTTPError as http_err:
|
||||
yield "error", f"HTTP error occurred: {http_err}"
|
||||
|
||||
except requests.RequestException as e:
|
||||
yield "error", f"Request to Jina-ai Reader failed: {e}"
|
||||
yield "error", f"Request to URL failed: {e}"
|
||||
|
||||
|
||||
class GetWeatherInformationBlock(Block, GetRequest):
|
||||
|
||||
@@ -206,6 +206,7 @@ class Secrets(UpdateTrackingModel["Secrets"], BaseSettings):
|
||||
medium_api_key: str = Field(default="", description="Medium API key")
|
||||
medium_author_id: str = Field(default="", description="Medium author ID")
|
||||
did_api_key: str = Field(default="", description="D-ID API Key")
|
||||
revid_api_key: str = Field(default="", description="revid.ai API key")
|
||||
|
||||
discord_bot_token: str = Field(default="", description="Discord bot token")
|
||||
|
||||
|
||||
Reference in New Issue
Block a user