Compare commits

...

36 Commits

Author SHA1 Message Date
Zamil Majdy
b0ce5524ce Merge branch 'dev' of github.com:Significant-Gravitas/AutoGPT into aryshare-revid 2025-05-20 17:49:04 +01:00
Reinier van der Leer
52b6d9696b fix validation errors for UserIntegrations 2025-05-20 17:29:57 +01:00
Zamil Majdy
a50ef04104 add image upload for requesst 2025-05-20 17:24:53 +01:00
Zamil Majdy
fa30bed042 Merge branch 'dev' of github.com:Significant-Gravitas/AutoGPT into aryshare-revid 2025-05-20 15:05:13 +01:00
Zamil Majdy
88360a9bf2 Merge branch 'Torantulino-patch-1' of github.com:Significant-Gravitas/AutoGPT into aryshare-revid 2025-05-20 14:18:46 +01:00
Zamil Majdy
b134bae980 Merge branch 'swiftyos/secrt-1045-ayrshare-integration' of github.com:Significant-Gravitas/AutoGPT into aryshare-revid 2025-05-20 14:18:37 +01:00
Zamil Majdy
694f701194 fix(backend): Force process exit on execution manager cleanup 2025-05-19 16:48:34 +01:00
Bently
3e1ee56e9f Merge branch 'dev' into Torantulino-patch-1 2025-05-17 05:54:32 +01:00
SwiftyOS
ca703fa1f0 fixed type on name 2025-05-16 16:15:13 +02:00
Swifty
3c3311c506 Merge branch 'dev' into swiftyos/secrt-1045-ayrshare-integration 2025-05-16 16:11:50 +02:00
SwiftyOS
7e62ce3159 final checks 2025-05-16 16:02:17 +02:00
SwiftyOS
9509762a94 Posting now working 2025-05-16 15:19:11 +02:00
SwiftyOS
3eff0a8df2 added hidden repsonse field 2025-05-16 14:58:47 +02:00
SwiftyOS
d53da6f572 added sso flow 2025-05-16 13:01:18 +02:00
SwiftyOS
fdf4785ae2 removed credentials fixed blocks 2025-05-16 12:46:37 +02:00
SwiftyOS
f1b5128715 add to all credentials 2025-05-16 11:42:51 +02:00
SwiftyOS
d26edd8dac add getAyrshareSSOUrl api call 2025-05-16 11:41:43 +02:00
Swifty
15bc5cf228 Merge branch 'dev' into swiftyos/secrt-1045-ayrshare-integration 2025-05-16 10:28:29 +02:00
SwiftyOS
2d3842bdd1 updated blocks and cred store access 2025-05-16 10:13:40 +02:00
Bentlybro
e7dde98cf8 replace `List with list` + remove un-used webhook stuff 2025-05-15 19:45:45 +01:00
SwiftyOS
cae4a6e145 updating to using block 2025-05-15 16:42:15 +02:00
SwiftyOS
e71a422173 formatting 2025-05-15 16:27:24 +02:00
SwiftyOS
6912e3ade3 added ayrshare integrations 2025-05-15 16:20:17 +02:00
Reinier van der Leer
7d90376eb2 simplify code 2025-05-15 14:57:56 +02:00
Reinier van der Leer
993e123f1b improve code 2025-05-15 14:06:31 +02:00
Reinier van der Leer
863a9e98ec feat(backend): Managed credentials store 2025-05-15 13:18:23 +02:00
Bently
7f996745ce Merge branch 'dev' into Torantulino-patch-1 2025-05-14 23:06:19 +01:00
Bently
e93ead51a9 Merge branch 'dev' into Torantulino-patch-1 2025-05-14 15:21:20 +01:00
Bentlybro
b7a6dfdb19 format 2025-05-14 15:21:05 +01:00
Bentlybro
2ea099c265 rm time.sleep 2025-05-14 15:15:06 +01:00
Bentlybro
bf70991e10 Update to change "videoGenerationModel" to use the "base" model 2025-05-14 14:26:23 +01:00
Bentlybro
b1e6161a7d Add "marketing" to block.py BlockCategory + remove comments 2025-05-13 17:22:30 +01:00
Bently
957706a941 Merge branch 'dev' into Torantulino-patch-1 2025-05-13 10:08:33 +01:00
Toran Bruce Richards
2b9f092b5c Replace LLM Generated UUIDs 2025-05-12 22:04:18 +01:00
Toran Bruce Richards
2afdd4d207 Merge branch 'dev' into Torantulino-patch-1 2025-05-12 21:54:28 +01:00
Toran Bruce Richards
d7af738358 Add new Revid Blocks 2025-05-12 21:52:43 +01:00
17 changed files with 1670 additions and 95 deletions

View File

@@ -197,6 +197,10 @@ SMARTLEAD_API_KEY=
# ZeroBounce
ZEROBOUNCE_API_KEY=
# Ayrshare
AYRSHARE_API_KEY=
AYRSHARE_JWT_KEY=
## ===== OPTIONAL API KEYS END ===== ##
# Logging Configuration

View File

@@ -52,6 +52,7 @@ class AudioTrack(str, Enum):
REFRESHER = ("Refresher",)
TOURIST = ("Tourist",)
TWIN_TYCHES = ("Twin Tyches",)
DONT_STOP_ME_ABSTRACT_FUTURE_BASS = ("Dont Stop Me Abstract Future Bass",)
@property
def audio_url(self):
@@ -77,6 +78,7 @@ class AudioTrack(str, Enum):
AudioTrack.REFRESHER: "https://cdn.tfrv.xyz/audio/refresher.mp3",
AudioTrack.TOURIST: "https://cdn.tfrv.xyz/audio/tourist.mp3",
AudioTrack.TWIN_TYCHES: "https://cdn.tfrv.xyz/audio/twin-tynches.mp3",
AudioTrack.DONT_STOP_ME_ABSTRACT_FUTURE_BASS: "https://cdn.revid.ai/audio/_dont-stop-me-abstract-future-bass.mp3",
}
return audio_urls[self]
@@ -104,6 +106,7 @@ class GenerationPreset(str, Enum):
MOVIE = ("Movie",)
STYLIZED_ILLUSTRATION = ("Stylized Illustration",)
MANGA = ("Manga",)
DEFAULT = ("DEFAULT",)
class Voice(str, Enum):
@@ -113,6 +116,7 @@ class Voice(str, Enum):
JESSICA = "Jessica"
CHARLOTTE = "Charlotte"
CALLUM = "Callum"
EVA = "Eva"
@property
def voice_id(self):
@@ -123,6 +127,7 @@ class Voice(str, Enum):
Voice.JESSICA: "cgSgspJ2msm6clMCkdW9",
Voice.CHARLOTTE: "XB0fDUnXU5powFXDhCwa",
Voice.CALLUM: "N2lVS1w4EtoT3dr4eOWO",
Voice.EVA: "FGY2WhTYpPnrIDTdsKH5",
}
return voice_id_map[self]
@@ -139,7 +144,54 @@ class VisualMediaType(str, Enum):
logger = logging.getLogger(__name__)
class AIShortformVideoCreatorBlock(Block):
class _RevidMixin:
"""Utility mixin that bundles the shared webhook / polling helpers."""
def create_video(self, api_key: SecretStr, payload: dict) -> dict:
url = "https://www.revid.ai/api/public/v2/render"
headers = {"key": api_key.get_secret_value()}
response = requests.post(url, json=payload, headers=headers)
logger.debug(
f"API Response Status Code: {response.status_code}, Content: {response.text}"
)
return response.json()
def check_video_status(self, api_key: SecretStr, pid: str) -> dict:
url = f"https://www.revid.ai/api/public/v2/status?pid={pid}"
headers = {"key": api_key.get_secret_value()}
response = requests.get(url, headers=headers)
return response.json()
def wait_for_video(
self,
api_key: SecretStr,
pid: str,
max_wait_time: int = 3600,
) -> str:
start_time = time.time()
while time.time() - start_time < max_wait_time:
status = self.check_video_status(api_key, pid)
logger.debug(f"Video status: {status}")
if status.get("status") == "ready" and "videoUrl" in status:
return status["videoUrl"]
elif status.get("status") == "error":
error_message = status.get("error", "Unknown error occurred")
logger.error(f"Video creation failed: {error_message}")
raise ValueError(f"Video creation failed: {error_message}")
elif status.get("status") in ["FAILED", "CANCELED"]:
logger.error(f"Video creation failed: {status.get('message')}")
raise ValueError(f"Video creation failed: {status.get('message')}")
time.sleep(10)
logger.error("Video creation timed out")
raise TimeoutError("Video creation timed out")
class AIShortformVideoCreatorBlock(Block, _RevidMixin):
"""Creates a shortform texttovideo clip using stock or AI imagery."""
class Input(BlockSchema):
credentials: CredentialsMetaInput[
Literal[ProviderName.REVID], Literal["api_key"]
@@ -206,86 +258,28 @@ class AIShortformVideoCreatorBlock(Block):
"https://example.com/video.mp4",
),
test_mock={
"create_webhook": lambda: (
"test_uuid",
"https://webhook.site/test_uuid",
),
"create_video": lambda api_key, payload: {"pid": "test_pid"},
"wait_for_video": lambda api_key, pid, webhook_token, max_wait_time=1000: "https://example.com/video.mp4",
"wait_for_video": lambda api_key, pid, max_wait_time=3600: "https://example.com/video.mp4",
},
test_credentials=TEST_CREDENTIALS,
)
def create_webhook(self):
url = "https://webhook.site/token"
headers = {"Accept": "application/json", "Content-Type": "application/json"}
response = requests.post(url, headers=headers)
webhook_data = response.json()
return webhook_data["uuid"], f"https://webhook.site/{webhook_data['uuid']}"
def create_video(self, api_key: SecretStr, payload: dict) -> dict:
url = "https://www.revid.ai/api/public/v2/render"
headers = {"key": api_key.get_secret_value()}
response = requests.post(url, json=payload, headers=headers)
logger.debug(
f"API Response Status Code: {response.status_code}, Content: {response.text}"
)
return response.json()
def check_video_status(self, api_key: SecretStr, pid: str) -> dict:
url = f"https://www.revid.ai/api/public/v2/status?pid={pid}"
headers = {"key": api_key.get_secret_value()}
response = requests.get(url, headers=headers)
return response.json()
def wait_for_video(
self,
api_key: SecretStr,
pid: str,
webhook_token: str,
max_wait_time: int = 1000,
) -> str:
start_time = time.time()
while time.time() - start_time < max_wait_time:
status = self.check_video_status(api_key, pid)
logger.debug(f"Video status: {status}")
if status.get("status") == "ready" and "videoUrl" in status:
return status["videoUrl"]
elif status.get("status") == "error":
error_message = status.get("error", "Unknown error occurred")
logger.error(f"Video creation failed: {error_message}")
raise ValueError(f"Video creation failed: {error_message}")
elif status.get("status") in ["FAILED", "CANCELED"]:
logger.error(f"Video creation failed: {status.get('message')}")
raise ValueError(f"Video creation failed: {status.get('message')}")
time.sleep(10)
logger.error("Video creation timed out")
raise TimeoutError("Video creation timed out")
def run(
self, input_data: Input, *, credentials: APIKeyCredentials, **kwargs
) -> BlockOutput:
# Create a new Webhook.site URL
webhook_token, webhook_url = self.create_webhook()
logger.debug(f"Webhook URL: {webhook_url}")
audio_url = input_data.background_music.audio_url
payload = {
"frameRate": input_data.frame_rate,
"resolution": input_data.resolution,
"frameDurationMultiplier": 18,
"webhook": webhook_url,
"webhook": None,
"creationParams": {
"mediaType": input_data.video_style,
"captionPresetName": "Wrap 1",
"selectedVoice": input_data.voice.voice_id,
"hasEnhancedGeneration": True,
"generationPreset": input_data.generation_preset.name,
"selectedAudio": input_data.background_music,
"selectedAudio": input_data.background_music.value,
"origin": "/create",
"inputText": input_data.script,
"flowType": "text-to-video",
@@ -301,7 +295,7 @@ class AIShortformVideoCreatorBlock(Block):
"selectedStoryStyle": {"value": "custom", "label": "Custom"},
"hasToGenerateVideos": input_data.video_style
!= VisualMediaType.STOCK_VIDEOS,
"audioUrl": audio_url,
"audioUrl": input_data.background_music.audio_url,
},
}
@@ -314,10 +308,354 @@ class AIShortformVideoCreatorBlock(Block):
f"Failed to create video: No project ID returned. API Response: {response}"
)
raise RuntimeError("Failed to create video: No project ID returned")
else:
logger.debug(
f"Video created with project ID: {pid}. Waiting for completion..."
)
video_url = self.wait_for_video(credentials.api_key, pid, webhook_token)
logger.debug(f"Video ready: {video_url}")
yield "video_url", video_url
logger.debug(f"Video created with project ID: {pid}. Waiting for completion...")
video_url = self.wait_for_video(credentials.api_key, pid)
logger.debug(f"Video ready: {video_url}")
yield "video_url", video_url
class AIAdMakerVideoCreatorBlock(Block, _RevidMixin):
"""Generates a 30second vertical AI advert using optional usersupplied imagery."""
class Input(BlockSchema):
credentials: CredentialsMetaInput[
Literal[ProviderName.REVID], Literal["api_key"]
] = CredentialsField(
description="Credentials for Revid.ai API access.",
)
script: str = SchemaField(
description="Short advertising copy. Line breaks create new scenes.",
placeholder="Introducing Foobar [show product photo] the gadget that does it all.",
)
ratio: str = SchemaField(description="Aspect ratio", default="9 / 16")
target_duration: int = SchemaField(
description="Desired length of the ad in seconds.", default=30
)
voice: Voice = SchemaField(
description="Narration voice", default=Voice.EVA, placeholder=Voice.EVA
)
background_music: AudioTrack = SchemaField(
description="Background track",
default=AudioTrack.DONT_STOP_ME_ABSTRACT_FUTURE_BASS,
)
input_media_urls: list[str] = SchemaField(
description="List of image URLs to feature in the advert.", default=[]
)
use_only_provided_media: bool = SchemaField(
description="Restrict visuals to supplied images only.", default=True
)
class Output(BlockSchema):
video_url: str = SchemaField(description="URL of the finished advert")
error: str = SchemaField(description="Error message on failure")
def __init__(self):
super().__init__(
id="3e3fd845-000e-457f-9f50-9f2f9e278bbd",
description="Creates an AIgenerated 30second advert (text + images)",
categories={BlockCategory.MARKETING, BlockCategory.AI},
input_schema=AIAdMakerVideoCreatorBlock.Input,
output_schema=AIAdMakerVideoCreatorBlock.Output,
test_input={
"credentials": TEST_CREDENTIALS_INPUT,
"script": "Test product launch!",
"input_media_urls": [
"https://cdn.revid.ai/uploads/1747076315114-image.png",
],
},
test_output=("video_url", "https://example.com/ad.mp4"),
test_mock={
"create_video": lambda api_key, payload: {"pid": "test_pid"},
"wait_for_video": lambda api_key, pid, max_wait_time=3600: "https://example.com/ad.mp4",
},
test_credentials=TEST_CREDENTIALS,
)
def run(self, input_data: Input, *, credentials: APIKeyCredentials, **kwargs):
payload = {
"webhook": None,
"creationParams": {
"targetDuration": input_data.target_duration,
"ratio": input_data.ratio,
"mediaType": "aiVideo",
"inputText": input_data.script,
"flowType": "text-to-video",
"slug": "ai-ad-generator",
"slugNew": "",
"isCopiedFrom": False,
"hasToGenerateVoice": True,
"hasToTranscript": False,
"hasToSearchMedia": True,
"hasAvatar": False,
"hasWebsiteRecorder": False,
"hasTextSmallAtBottom": False,
"selectedAudio": input_data.background_music.value,
"selectedVoice": input_data.voice.voice_id,
"selectedAvatar": "https://cdn.revid.ai/avatars/young-woman.mp4",
"selectedAvatarType": "video/mp4",
"websiteToRecord": "",
"hasToGenerateCover": True,
"nbGenerations": 1,
"disableCaptions": False,
"mediaMultiplier": "medium",
"characters": [],
"captionPresetName": "Revid",
"sourceType": "contentScraping",
"selectedStoryStyle": {"value": "custom", "label": "General"},
"generationPreset": "DEFAULT",
"hasToGenerateMusic": False,
"isOptimizedForChinese": False,
"generationUserPrompt": "",
"enableNsfwFilter": False,
"addStickers": False,
"typeMovingImageAnim": "dynamic",
"hasToGenerateSoundEffects": False,
"forceModelType": "gpt-image-1",
"selectedCharacters": [],
"lang": "",
"voiceSpeed": 1,
"disableAudio": False,
"disableVoice": False,
"useOnlyProvidedMedia": input_data.use_only_provided_media,
"imageGenerationModel": "ultra",
"videoGenerationModel": "base",
"hasEnhancedGeneration": True,
"hasEnhancedGenerationPro": True,
"inputMedias": [
{"url": url, "title": "", "type": "image"}
for url in input_data.input_media_urls
],
"hasToGenerateVideos": True,
"audioUrl": input_data.background_music.audio_url,
"watermark": None,
},
}
response = self.create_video(credentials.api_key, payload)
pid = response.get("pid")
if not pid:
raise RuntimeError("Failed to create video: No project ID returned")
video_url = self.wait_for_video(credentials.api_key, pid)
yield "video_url", video_url
class AIPromptToVideoCreatorBlock(Block, _RevidMixin):
"""Turns a single creative prompt into a fully AIgenerated video."""
class Input(BlockSchema):
credentials: CredentialsMetaInput[
Literal[ProviderName.REVID], Literal["api_key"]
] = CredentialsField(description="Revid.ai API credentials")
prompt: str = SchemaField(
description="Imaginative prompt describing the desired video.",
placeholder="A neonlit cyberpunk alley with rainsoaked pavements.",
)
ratio: str = SchemaField(default="9 / 16")
prompt_target_duration: int = SchemaField(default=30)
voice: Voice = SchemaField(default=Voice.EVA)
background_music: AudioTrack = SchemaField(
default=AudioTrack.DONT_STOP_ME_ABSTRACT_FUTURE_BASS
)
class Output(BlockSchema):
video_url: str = SchemaField(description="Rendered video URL")
error: str = SchemaField(description="Error message if any")
def __init__(self):
super().__init__(
id="46f4099c-ad01-4d79-874c-37a24c937ba3",
description="Creates an AI video from a single prompt (no linebreaking script).",
categories={BlockCategory.AI, BlockCategory.SOCIAL},
input_schema=AIPromptToVideoCreatorBlock.Input,
output_schema=AIPromptToVideoCreatorBlock.Output,
test_input={
"credentials": TEST_CREDENTIALS_INPUT,
"prompt": "Epic timelapse of a city skyline from day to night",
},
test_output=("video_url", "https://example.com/prompt.mp4"),
test_mock={
"create_video": lambda api_key, payload: {"pid": "test_pid"},
"wait_for_video": lambda api_key, pid, max_wait_time=3600: "https://example.com/prompt.mp4",
},
test_credentials=TEST_CREDENTIALS,
)
def run(self, input_data: Input, *, credentials: APIKeyCredentials, **kwargs):
payload = {
"webhook": None,
"creationParams": {
"mediaType": "aiVideo",
"flowType": "prompt-to-video",
"slug": "prompt-to-video",
"slugNew": "",
"isCopiedFrom": False,
"hasToGenerateVoice": True,
"hasToTranscript": False,
"hasToSearchMedia": True,
"hasAvatar": False,
"hasWebsiteRecorder": False,
"hasTextSmallAtBottom": False,
"ratio": input_data.ratio,
"selectedAudio": input_data.background_music.value,
"selectedVoice": input_data.voice.voice_id,
"selectedAvatar": "https://cdn.revid.ai/avatars/young-woman.mp4",
"selectedAvatarType": "video/mp4",
"websiteToRecord": "",
"hasToGenerateCover": True,
"nbGenerations": 1,
"disableCaptions": False,
"characters": [],
"captionPresetName": "Revid",
"sourceType": "contentScraping",
"selectedStoryStyle": {"value": "custom", "label": "General"},
"generationPreset": "DEFAULT",
"hasToGenerateMusic": False,
"isOptimizedForChinese": False,
"generationUserPrompt": input_data.prompt,
"enableNsfwFilter": False,
"addStickers": False,
"typeMovingImageAnim": "dynamic",
"hasToGenerateSoundEffects": False,
"promptTargetDuration": input_data.prompt_target_duration,
"selectedCharacters": [],
"lang": "",
"voiceSpeed": 1,
"disableAudio": False,
"disableVoice": False,
"imageGenerationModel": "good",
"videoGenerationModel": "base",
"hasEnhancedGeneration": True,
"hasEnhancedGenerationPro": True,
"inputMedias": [],
"hasToGenerateVideos": True,
"audioUrl": input_data.background_music.audio_url,
"watermark": None,
},
}
response = self.create_video(credentials.api_key, payload)
pid = response.get("pid")
if not pid:
raise RuntimeError("Failed to create video: No project ID returned")
video_url = self.wait_for_video(credentials.api_key, pid)
yield "video_url", video_url
class AIScreenshotToVideoAdBlock(Block, _RevidMixin):
"""Creates an advert where the supplied screenshot is narrated by an AI avatar."""
class Input(BlockSchema):
credentials: CredentialsMetaInput[
Literal[ProviderName.REVID], Literal["api_key"]
] = CredentialsField(description="Revid.ai API key")
script: str = SchemaField(
description="Narration that will accompany the screenshot.",
placeholder="Check out these amazing stats!",
)
screenshot_url: str = SchemaField(
description="Screenshot or image URL to showcase."
)
ratio: str = SchemaField(default="9 / 16")
target_duration: int = SchemaField(default=30)
voice: Voice = SchemaField(default=Voice.EVA)
background_music: AudioTrack = SchemaField(
default=AudioTrack.DONT_STOP_ME_ABSTRACT_FUTURE_BASS
)
class Output(BlockSchema):
video_url: str = SchemaField(description="Rendered video URL")
error: str = SchemaField(description="Error, if encountered")
def __init__(self):
super().__init__(
id="9f68982c-3af6-4923-9a97-b50a8c8d2234",
description="Turns a screenshot into an engaging, avatarnarrated video advert.",
categories={BlockCategory.AI, BlockCategory.MARKETING},
input_schema=AIScreenshotToVideoAdBlock.Input,
output_schema=AIScreenshotToVideoAdBlock.Output,
test_input={
"credentials": TEST_CREDENTIALS_INPUT,
"script": "Amazing numbers!",
"screenshot_url": "https://cdn.revid.ai/uploads/1747080376028-image.png",
},
test_output=("video_url", "https://example.com/screenshot.mp4"),
test_mock={
"create_video": lambda api_key, payload: {"pid": "test_pid"},
"wait_for_video": lambda api_key, pid, max_wait_time=3600: "https://example.com/screenshot.mp4",
},
test_credentials=TEST_CREDENTIALS,
)
def run(self, input_data: Input, *, credentials: APIKeyCredentials, **kwargs):
payload = {
"webhook": None,
"creationParams": {
"targetDuration": input_data.target_duration,
"ratio": input_data.ratio,
"mediaType": "aiVideo",
"hasAvatar": True,
"removeAvatarBackground": True,
"inputText": input_data.script,
"flowType": "text-to-video",
"slug": "ai-ad-generator",
"slugNew": "screenshot-to-video-ad",
"isCopiedFrom": "ai-ad-generator",
"hasToGenerateVoice": True,
"hasToTranscript": False,
"hasToSearchMedia": True,
"hasWebsiteRecorder": False,
"hasTextSmallAtBottom": False,
"selectedAudio": input_data.background_music.value,
"selectedVoice": input_data.voice.voice_id,
"selectedAvatar": "https://cdn.revid.ai/avatars/young-woman.mp4",
"selectedAvatarType": "video/mp4",
"websiteToRecord": "",
"hasToGenerateCover": True,
"nbGenerations": 1,
"disableCaptions": False,
"mediaMultiplier": "medium",
"characters": [],
"captionPresetName": "Revid",
"sourceType": "contentScraping",
"selectedStoryStyle": {"value": "custom", "label": "General"},
"generationPreset": "DEFAULT",
"hasToGenerateMusic": False,
"isOptimizedForChinese": False,
"generationUserPrompt": "",
"enableNsfwFilter": False,
"addStickers": False,
"typeMovingImageAnim": "dynamic",
"hasToGenerateSoundEffects": False,
"forceModelType": "gpt-image-1",
"selectedCharacters": [],
"lang": "",
"voiceSpeed": 1,
"disableAudio": False,
"disableVoice": False,
"useOnlyProvidedMedia": True,
"imageGenerationModel": "ultra",
"videoGenerationModel": "base",
"hasEnhancedGeneration": True,
"hasEnhancedGenerationPro": True,
"inputMedias": [
{"url": input_data.screenshot_url, "title": "", "type": "image"}
],
"hasToGenerateVideos": True,
"audioUrl": input_data.background_music.audio_url,
"watermark": None,
},
}
response = self.create_video(credentials.api_key, payload)
pid = response.get("pid")
if not pid:
raise RuntimeError("Failed to create video: No project ID returned")
video_url = self.wait_for_video(credentials.api_key, pid)
yield "video_url", video_url

View File

@@ -0,0 +1,482 @@
from __future__ import annotations
import json
import logging
from dataclasses import dataclass
from enum import Enum
from typing import Any, Dict, List, Optional, Union
from backend.util.request import Requests
from backend.util.settings import Settings
logger = logging.getLogger(__name__)
settings = Settings()
class AyrshareAPIException(Exception):
def __init__(self, message: str, status_code: int):
super().__init__(message)
self.status_code = status_code
class SocialPlatform(str, Enum):
BLUESKY = "bluesky"
FACEBOOK = "facebook"
TWITTER = "twitter"
LINKEDIN = "linkedin"
INSTAGRAM = "instagram"
YOUTUBE = "youtube"
REDDIT = "reddit"
TELEGRAM = "telegram"
GMB = "gmb"
PINTEREST = "pinterest"
TIKTOK = "tiktok"
@dataclass
class EmailConfig:
to: str
subject: Optional[str] = None
body: Optional[str] = None
from_name: Optional[str] = None
from_email: Optional[str] = None
@dataclass
class JWTResponse:
status: str
title: str
token: str
url: str
emailSent: Optional[bool] = None
expiresIn: Optional[str] = None
@dataclass
class ProfileResponse:
status: str
title: str
refId: str
profileKey: str
messagingActive: Optional[bool] = None
@dataclass
class PostResponse:
status: str
id: str
refId: str
profileTitle: str
post: str
postIds: Optional[List[Dict[str, Any]]] = None
scheduleDate: Optional[str] = None
errors: Optional[List[str]] = None
@dataclass
class AutoHashtag:
max: Optional[int] = None
position: Optional[str] = None
@dataclass
class FirstComment:
text: str
platforms: Optional[List[SocialPlatform]] = None
@dataclass
class AutoSchedule:
interval: str
platforms: Optional[List[SocialPlatform]] = None
startDate: Optional[str] = None
endDate: Optional[str] = None
@dataclass
class AutoRepost:
interval: str
platforms: Optional[List[SocialPlatform]] = None
startDate: Optional[str] = None
endDate: Optional[str] = None
@dataclass
class PostError:
code: int
message: str
details: str
class AyrshareClient:
"""Client for the Ayrshare Social Media Post API"""
API_URL = "https://api.ayrshare.com/api"
POST_ENDPOINT = f"{API_URL}/post"
PROFILES_ENDPOINT = f"{API_URL}/profiles"
JWT_ENDPOINT = f"{PROFILES_ENDPOINT}/generateJWT"
def __init__(
self,
custom_requests: Optional[Requests] = None,
):
headers: Dict[str, str] = {
"Content-Type": "application/json",
"Authorization": f"Bearer {settings.secrets.ayrshare_api_key}",
}
self.headers = headers
if custom_requests:
self._requests = custom_requests
else:
self._requests = Requests(
extra_headers=headers,
trusted_origins=["https://api.ayrshare.com"],
raise_for_status=False,
)
def generate_jwt(
self,
private_key: str,
profile_key: str,
logout: Optional[bool] = None,
redirect: Optional[str] = None,
allowed_social: Optional[List[SocialPlatform]] = None,
verify: Optional[bool] = None,
base64: Optional[bool] = None,
expires_in: Optional[int] = None,
email: Optional[EmailConfig] = None,
) -> JWTResponse:
"""
Generate a JSON Web Token (JWT) for use with single sign on.
Args:
domain: Domain of app. Must match the domain given during onboarding.
private_key: Private Key used for encryption.
profile_key: User Profile Key (not the API Key).
logout: Automatically logout the current session.
redirect: URL to redirect to when the "Done" button or logo is clicked.
allowed_social: List of social networks to display in the linking page.
verify: Verify that the generated token is valid (recommended for non-production).
base64: Whether the private key is base64 encoded.
expires_in: Token longevity in minutes (1-2880).
email: Configuration for sending Connect Accounts email.
Returns:
JWTResponse object containing the JWT token and URL.
Raises:
AyrshareAPIException: If the API request fails or private key is invalid.
"""
payload: Dict[str, Any] = {
"domain": "id-pojeg",
"privateKey": private_key,
"profileKey": profile_key,
}
headers = self.headers
headers["Profile-Key"] = profile_key
if logout is not None:
payload["logout"] = logout
if redirect is not None:
payload["redirect"] = redirect
if allowed_social is not None:
payload["allowedSocial"] = [p.value for p in allowed_social]
if verify is not None:
payload["verify"] = verify
if base64 is not None:
payload["base64"] = base64
if expires_in is not None:
payload["expiresIn"] = expires_in
if email is not None:
payload["email"] = email.__dict__
response = self._requests.post(self.JWT_ENDPOINT, json=payload, headers=headers)
if not response.ok:
try:
error_data = response.json()
error_message = error_data.get("message", response.text)
except json.JSONDecodeError:
error_message = response.text
raise AyrshareAPIException(
f"Ayrshare API request failed ({response.status_code}): {error_message}",
response.status_code,
)
response_data = response.json()
if response_data.get("status") != "success":
raise AyrshareAPIException(
f"Ayrshare API returned error: {response_data.get('message', 'Unknown error')}",
response.status_code,
)
return JWTResponse(**response_data)
def create_profile(
self,
title: str,
messaging_active: Optional[bool] = None,
hide_top_header: Optional[bool] = None,
top_header: Optional[str] = None,
disable_social: Optional[List[SocialPlatform]] = None,
team: Optional[bool] = None,
email: Optional[str] = None,
sub_header: Optional[str] = None,
tags: Optional[List[str]] = None,
) -> ProfileResponse | PostError:
"""
Create a new User Profile under your Primary Profile.
Args:
title: Title of the new profile. Must be unique.
messaging_active: Set to true to activate messaging for this user profile.
hide_top_header: Hide the top header on the social accounts linkage page.
top_header: Change the header on the social accounts linkage page.
disable_social: Array of social networks that are disabled for this user's profile.
team: Create a new user profile as a team member.
email: Email address for team member invite (required if team is true).
sub_header: Change the sub header on the social accounts linkage page.
tags: Array of strings to tag user profiles.
Returns:
ProfileResponse object containing the profile details and profile key.
Raises:
AyrshareAPIException: If the API request fails or profile title already exists.
"""
payload: Dict[str, Any] = {
"title": title,
}
if messaging_active is not None:
payload["messagingActive"] = messaging_active
if hide_top_header is not None:
payload["hideTopHeader"] = hide_top_header
if top_header is not None:
payload["topHeader"] = top_header
if disable_social is not None:
payload["disableSocial"] = [p.value for p in disable_social]
if team is not None:
payload["team"] = team
if email is not None:
payload["email"] = email
if sub_header is not None:
payload["subHeader"] = sub_header
if tags is not None:
payload["tags"] = tags
response = self._requests.post(self.PROFILES_ENDPOINT, json=payload)
if not response.ok:
try:
error_data = response.json()
error_message = error_data.get("message", response.text)
except json.JSONDecodeError:
error_message = response.text
raise AyrshareAPIException(
f"Ayrshare API request failed ({response.status_code}): {error_message}",
response.status_code,
)
response_data = response.json()
if response_data.get("status") != "success":
raise AyrshareAPIException(
f"Ayrshare API returned error: {response_data.get('message', 'Unknown error')}",
response.status_code,
)
return ProfileResponse(**response_data)
def create_post(
self,
post: str,
platforms: List[SocialPlatform],
media_urls: Optional[List[str]] = None,
is_video: Optional[bool] = None,
schedule_date: Optional[str] = None,
first_comment: Optional[FirstComment] = None,
disable_comments: Optional[bool] = None,
shorten_links: Optional[bool] = None,
auto_schedule: Optional[AutoSchedule] = None,
auto_repost: Optional[AutoRepost] = None,
auto_hashtag: Optional[Union[AutoHashtag, bool]] = None,
unsplash: Optional[str] = None,
bluesky_options: Optional[Dict[str, Any]] = None,
facebook_options: Optional[Dict[str, Any]] = None,
gmb_options: Optional[Dict[str, Any]] = None,
instagram_options: Optional[Dict[str, Any]] = None,
linkedin_options: Optional[Dict[str, Any]] = None,
pinterest_options: Optional[Dict[str, Any]] = None,
reddit_options: Optional[Dict[str, Any]] = None,
telegram_options: Optional[Dict[str, Any]] = None,
threads_options: Optional[Dict[str, Any]] = None,
tiktok_options: Optional[Dict[str, Any]] = None,
twitter_options: Optional[Dict[str, Any]] = None,
youtube_options: Optional[Dict[str, Any]] = None,
requires_approval: Optional[bool] = None,
random_post: Optional[bool] = None,
random_media_url: Optional[bool] = None,
idempotency_key: Optional[str] = None,
notes: Optional[str] = None,
profile_key: Optional[str] = None,
) -> PostResponse | PostError:
"""
Create a post across multiple social media platforms.
Args:
post: The post text to be published
platforms: List of platforms to post to (e.g. [SocialPlatform.TWITTER, SocialPlatform.FACEBOOK])
media_urls: Optional list of media URLs to include
is_video: Whether the media is a video
schedule_date: UTC datetime for scheduling (YYYY-MM-DDThh:mm:ssZ)
first_comment: Configuration for first comment
disable_comments: Whether to disable comments
shorten_links: Whether to shorten links
auto_schedule: Configuration for automatic scheduling
auto_repost: Configuration for automatic reposting
auto_hashtag: Configuration for automatic hashtags
unsplash: Unsplash image configuration
bluesky_options: Bluesky-specific options
facebook_options: Facebook-specific options
gmb_options: Google Business Profile options
instagram_options: Instagram-specific options
linkedin_options: LinkedIn-specific options
pinterest_options: Pinterest-specific options
reddit_options: Reddit-specific options
telegram_options: Telegram-specific options
threads_options: Threads-specific options
tiktok_options: TikTok-specific options
twitter_options: Twitter-specific options
youtube_options: YouTube-specific options
requires_approval: Whether to enable approval workflow
random_post: Whether to generate random post text
random_media_url: Whether to generate random media
idempotency_key: Unique ID for the post
notes: Additional notes for the post
Returns:
PostResponse object containing the post details and status
Raises:
AyrshareAPIException: If the API request fails
"""
payload: Dict[str, Any] = {
"post": post,
"platforms": [p.value for p in platforms],
}
# Add optional parameters if provided
if media_urls:
payload["mediaUrls"] = media_urls
if is_video is not None:
payload["isVideo"] = is_video
if schedule_date:
payload["scheduleDate"] = schedule_date
if first_comment:
first_comment_dict = first_comment.__dict__.copy()
if first_comment.platforms:
first_comment_dict["platforms"] = [
p.value for p in first_comment.platforms
]
payload["firstComment"] = first_comment_dict
if disable_comments is not None:
payload["disableComments"] = disable_comments
if shorten_links is not None:
payload["shortenLinks"] = shorten_links
if auto_schedule:
auto_schedule_dict = auto_schedule.__dict__.copy()
if auto_schedule.platforms:
auto_schedule_dict["platforms"] = [
p.value for p in auto_schedule.platforms
]
payload["autoSchedule"] = auto_schedule_dict
if auto_repost:
auto_repost_dict = auto_repost.__dict__.copy()
if auto_repost.platforms:
auto_repost_dict["platforms"] = [p.value for p in auto_repost.platforms]
payload["autoRepost"] = auto_repost_dict
if auto_hashtag:
payload["autoHashtag"] = (
auto_hashtag.__dict__
if isinstance(auto_hashtag, AutoHashtag)
else auto_hashtag
)
if unsplash:
payload["unsplash"] = unsplash
if bluesky_options:
payload["blueskyOptions"] = bluesky_options
if facebook_options:
payload["faceBookOptions"] = facebook_options
if gmb_options:
payload["gmbOptions"] = gmb_options
if instagram_options:
payload["instagramOptions"] = instagram_options
if linkedin_options:
payload["linkedInOptions"] = linkedin_options
if pinterest_options:
payload["pinterestOptions"] = pinterest_options
if reddit_options:
payload["redditOptions"] = reddit_options
if telegram_options:
payload["telegramOptions"] = telegram_options
if threads_options:
payload["threadsOptions"] = threads_options
if tiktok_options:
payload["tikTokOptions"] = tiktok_options
if twitter_options:
payload["twitterOptions"] = twitter_options
if youtube_options:
payload["youTubeOptions"] = youtube_options
if requires_approval is not None:
payload["requiresApproval"] = requires_approval
if random_post is not None:
payload["randomPost"] = random_post
if random_media_url is not None:
payload["randomMediaUrl"] = random_media_url
if idempotency_key:
payload["idempotencyKey"] = idempotency_key
if notes:
payload["notes"] = notes
headers = self.headers
if profile_key:
headers["Profile-Key"] = profile_key
response = self._requests.post(
self.POST_ENDPOINT, json=payload, headers=headers
)
if not response.ok:
try:
error_data = response.json()
error_message = error_data.get("message", response.text)
error_code = error_data.get("code", response.status_code)
error_details = error_data.get("details", {})
logger.error(error_data)
return PostError(
code=error_code,
message=error_message,
details=error_details,
)
except json.JSONDecodeError:
error_message = response.text
raise AyrshareAPIException(
f"Ayrshare API request failed ({response.status_code}): {error_message}",
response.status_code,
)
response_data = response.json()
if response_data.get("status") != "success":
raise AyrshareAPIException(
f"Ayrshare API returned error: {response_data.get('message', 'Unknown error')}",
response.status_code,
)
# Return the first post from the response
return PostResponse(**response_data["posts"][0])

View File

@@ -0,0 +1,531 @@
import logging
from datetime import datetime
from typing import List, Optional
from pydantic import BaseModel, Field, SecretStr
from backend.blocks.ayrshare._api import (
AyrshareClient,
PostError,
PostResponse,
SocialPlatform,
)
from backend.data.block import Block, BlockCategory, BlockOutput, BlockSchema, BlockType
from backend.data.model import SchemaField
from backend.integrations.credentials_store import IntegrationCredentialsStore
logger = logging.getLogger(__name__)
creads_store = IntegrationCredentialsStore()
class RequestOutput(BaseModel):
"""Base output model for Ayrshare social media posts."""
status: str = Field(..., description="Status of the post")
id: str = Field(..., description="ID of the post")
refId: str = Field(..., description="Reference ID of the post")
profileTitle: str = Field(..., description="Title of the profile")
post: str = Field(..., description="The post text")
postIds: Optional[List[dict]] = Field(
description="IDs of the posts on each platform"
)
scheduleDate: Optional[str] = Field(description="Scheduled date of the post")
errors: Optional[List[str]] = Field(description="Any errors that occurred")
class AyrsharePostBlockBase(Block):
"""Base class for Ayrshare social media posting blocks."""
class Input(BlockSchema):
"""Base input model for Ayrshare social media posts."""
post: str = SchemaField(
description="The post text to be published", default="", advanced=False
)
media_urls: List[str] = SchemaField(
description="Optional list of media URLs to include. Set is_video in advanced settings to true if you want to upload videos.",
default_factory=list,
advanced=False,
)
is_video: bool = SchemaField(
description="Whether the media is a video", default=False, advanced=True
)
schedule_date: Optional[datetime] = SchemaField(
description="UTC datetime for scheduling (YYYY-MM-DDThh:mm:ssZ)",
default=None,
advanced=True,
)
disable_comments: bool = SchemaField(
description="Whether to disable comments", default=False, advanced=True
)
shorten_links: bool = SchemaField(
description="Whether to shorten links", default=False, advanced=True
)
unsplash: Optional[str] = SchemaField(
description="Unsplash image configuration", default=None, advanced=True
)
requires_approval: bool = SchemaField(
description="Whether to enable approval workflow",
default=False,
advanced=True,
)
random_post: bool = SchemaField(
description="Whether to generate random post text",
default=False,
advanced=True,
)
random_media_url: bool = SchemaField(
description="Whether to generate random media", default=False, advanced=True
)
notes: Optional[str] = SchemaField(
description="Additional notes for the post", default=None, advanced=True
)
class Output(BlockSchema):
post_result: RequestOutput = SchemaField(description="The result of the post")
def __init__(
self,
id="b3a7b3b9-5169-410a-9d5c-fd625460fb14",
description="Ayrshare Post",
):
super().__init__(
# The unique identifier for the block, this value will be persisted in the DB.
# It should be unique and constant across the application run.
# Use the UUID format for the ID.
id=id,
# The description of the block, explaining what the block does.
description=description,
# The set of categories that the block belongs to.
# Each category is an instance of BlockCategory Enum.
categories={BlockCategory.SOCIAL},
# The type of block, this is used to determine the block type in the UI.
block_type=BlockType.AYRSHARE,
# The schema, defined as a Pydantic model, for the input data.
input_schema=AyrsharePostBlockBase.Input,
# The schema, defined as a Pydantic model, for the output data.
output_schema=AyrsharePostBlockBase.Output,
)
@staticmethod
def create_client():
return AyrshareClient()
def _create_post(
self,
input_data: "AyrsharePostBlockBase.Input",
platforms: List[SocialPlatform],
profile_key: Optional[str] = None,
) -> PostResponse | PostError:
client = self.create_client()
"""Create a post on the specified platforms."""
iso_date = (
input_data.schedule_date.isoformat() if input_data.schedule_date else None
)
response = client.create_post(
post=input_data.post,
platforms=platforms,
media_urls=input_data.media_urls,
is_video=input_data.is_video,
schedule_date=iso_date,
disable_comments=input_data.disable_comments,
shorten_links=input_data.shorten_links,
unsplash=input_data.unsplash,
requires_approval=input_data.requires_approval,
random_post=input_data.random_post,
random_media_url=input_data.random_media_url,
notes=input_data.notes,
profile_key=profile_key,
)
return response
def run(
self,
input_data: "AyrsharePostBlockBase.Input",
*,
profile_key: SecretStr,
**kwargs,
) -> BlockOutput:
"""Run the block."""
platforms = [SocialPlatform.FACEBOOK]
if not profile_key:
yield "error", "Please Link a social account via Ayrshare"
return
post_result = self._create_post(
input_data, platforms=platforms, profile_key=profile_key.get_secret_value()
)
if isinstance(post_result, PostError):
yield "error", post_result.message
return
yield "post_result", post_result
class PostToFacebookBlock(AyrsharePostBlockBase):
"""Block for posting to Facebook."""
def __init__(self):
super().__init__(
id="3352f512-3524-49ed-a08f-003042da2fc1",
description="Post to Facebook using Ayrshare",
)
def run(
self,
input_data: AyrsharePostBlockBase.Input,
*,
profile_key: SecretStr,
**kwargs,
) -> BlockOutput:
"""Post to Facebook."""
if not profile_key:
yield "error", "Please Link a social account via Ayrshare"
return
post_result = self._create_post(
input_data,
[SocialPlatform.FACEBOOK],
profile_key=profile_key.get_secret_value(),
)
if isinstance(post_result, PostError):
yield "error", post_result.message
return
yield "post_result", post_result
class PostToXBlock(AyrsharePostBlockBase):
"""Block for posting to X / Twitter."""
def __init__(self):
super().__init__(
id="9e8f844e-b4a5-4b25-80f2-9e1dd7d67625",
description="Post to X / Twitter using Ayrshare",
)
def run(
self,
input_data: AyrsharePostBlockBase.Input,
*,
profile_key: SecretStr,
**kwargs,
) -> BlockOutput:
"""Post to Twitter."""
if not profile_key:
yield "error", "Please Link a social account via Ayrshare"
return
post_result = self._create_post(
input_data,
[SocialPlatform.TWITTER],
profile_key=profile_key.get_secret_value(),
)
if isinstance(post_result, PostError):
yield "error", post_result.message
return
yield "post_result", post_result
class PostToLinkedInBlock(AyrsharePostBlockBase):
"""Block for posting to LinkedIn."""
def __init__(self):
super().__init__(
id="589af4e4-507f-42fd-b9ac-a67ecef25811",
description="Post to LinkedIn using Ayrshare",
)
def run(
self,
input_data: AyrsharePostBlockBase.Input,
*,
profile_key: SecretStr,
**kwargs,
) -> BlockOutput:
"""Post to LinkedIn."""
if not profile_key:
yield "error", "Please Link a social account via Ayrshare"
return
post_result = self._create_post(
input_data,
[SocialPlatform.LINKEDIN],
profile_key=profile_key.get_secret_value(),
)
if isinstance(post_result, PostError):
yield "error", post_result.message
return
yield "post_result", post_result
class PostToInstagramBlock(AyrsharePostBlockBase):
"""Block for posting to Instagram."""
def __init__(self):
super().__init__(
id="89b02b96-a7cb-46f4-9900-c48b32fe1552",
description="Post to Instagram using Ayrshare",
)
def run(
self,
input_data: AyrsharePostBlockBase.Input,
*,
profile_key: SecretStr,
**kwargs,
) -> BlockOutput:
"""Post to Instagram."""
if not profile_key:
yield "error", "Please Link a social account via Ayrshare"
return
post_result = self._create_post(
input_data,
[SocialPlatform.INSTAGRAM],
profile_key=profile_key.get_secret_value(),
)
if isinstance(post_result, PostError):
yield "error", post_result.message
return
yield "post_result", post_result
class PostToYouTubeBlock(AyrsharePostBlockBase):
"""Block for posting to YouTube."""
def __init__(self):
super().__init__(
id="0082d712-ff1b-4c3d-8a8d-6c7721883b83",
description="Post to YouTube using Ayrshare",
)
def run(
self,
input_data: AyrsharePostBlockBase.Input,
*,
profile_key: SecretStr,
**kwargs,
) -> BlockOutput:
"""Post to YouTube."""
if not profile_key:
yield "error", "Please Link a social account via Ayrshare"
return
post_result = self._create_post(
input_data,
[SocialPlatform.YOUTUBE],
profile_key=profile_key.get_secret_value(),
)
if isinstance(post_result, PostError):
yield "error", post_result.message
return
yield "post_result", post_result
class PostToRedditBlock(AyrsharePostBlockBase):
"""Block for posting to Reddit."""
def __init__(self):
super().__init__(
id="c7733580-3c72-483e-8e47-a8d58754d853",
description="Post to Reddit using Ayrshare",
)
def run(
self,
input_data: AyrsharePostBlockBase.Input,
*,
profile_key: SecretStr,
**kwargs,
) -> BlockOutput:
"""Post to Reddit."""
if not profile_key:
yield "error", "Please Link a social account via Ayrshare"
return
post_result = self._create_post(
input_data,
[SocialPlatform.REDDIT],
profile_key=profile_key.get_secret_value(),
)
if isinstance(post_result, PostError):
yield "error", post_result.message
return
yield "post_result", post_result
class PostToTelegramBlock(AyrsharePostBlockBase):
"""Block for posting to Telegram."""
def __init__(self):
super().__init__(
id="47bc74eb-4af2-452c-b933-af377c7287df",
description="Post to Telegram using Ayrshare",
)
def run(
self,
input_data: AyrsharePostBlockBase.Input,
*,
profile_key: SecretStr,
**kwargs,
) -> BlockOutput:
"""Post to Telegram."""
if not profile_key:
yield "error", "Please Link a social account via Ayrshare"
return
post_result = self._create_post(
input_data,
[SocialPlatform.TELEGRAM],
profile_key=profile_key.get_secret_value(),
)
if isinstance(post_result, PostError):
yield "error", post_result.message
return
yield "post_result", post_result
class PostToGMBBlock(AyrsharePostBlockBase):
"""Block for posting to Google My Business."""
def __init__(self):
super().__init__(
id="2c38c783-c484-4503-9280-ef5d1d345a7e",
description="Post to Google My Business using Ayrshare",
)
def run(
self,
input_data: AyrsharePostBlockBase.Input,
*,
profile_key: SecretStr,
**kwargs,
) -> BlockOutput:
"""Post to Google My Business."""
if not profile_key:
yield "error", "Please Link a social account via Ayrshare"
return
post_result = self._create_post(
input_data,
[SocialPlatform.GMB],
profile_key=profile_key.get_secret_value(),
)
if isinstance(post_result, PostError):
yield "error", post_result.message
return
yield "post_result", post_result
class PostToPinterestBlock(AyrsharePostBlockBase):
"""Block for posting to Pinterest."""
def __init__(self):
super().__init__(
id="3ca46e05-dbaa-4afb-9e95-5a429c4177e6",
description="Post to Pinterest using Ayrshare",
)
def run(
self,
input_data: AyrsharePostBlockBase.Input,
*,
profile_key: SecretStr,
**kwargs,
) -> BlockOutput:
"""Post to Pinterest."""
if not profile_key:
yield "error", "Please Link a social account via Ayrshare"
return
post_result = self._create_post(
input_data,
[SocialPlatform.PINTEREST],
profile_key=profile_key.get_secret_value(),
)
if isinstance(post_result, PostError):
yield "error", post_result.message
return
yield "post_result", post_result
class PostToTikTokBlock(AyrsharePostBlockBase):
"""Block for posting to TikTok."""
def __init__(self):
super().__init__(
id="7faf4b27-96b0-4f05-bf64-e0de54ae74e1",
description="Post to TikTok using Ayrshare",
)
def run(
self,
input_data: AyrsharePostBlockBase.Input,
*,
profile_key: SecretStr,
**kwargs,
) -> BlockOutput:
"""Post to TikTok."""
if not profile_key:
yield "error", "Please Link a social account via Ayrshare"
return
post_result = self._create_post(
input_data,
[SocialPlatform.TIKTOK],
profile_key=profile_key.get_secret_value(),
)
if isinstance(post_result, PostError):
yield "error", post_result.message
return
yield "post_result", post_result
class PostToBlueskyBlock(AyrsharePostBlockBase):
"""Block for posting to Bluesky."""
def __init__(self):
super().__init__(
id="cbd52c2a-06d2-43ed-9560-6576cc163283",
description="Post to Bluesky using Ayrshare",
)
def run(
self,
input_data: AyrsharePostBlockBase.Input,
*,
profile_key: SecretStr,
**kwargs,
) -> BlockOutput:
"""Post to Bluesky."""
if not profile_key:
yield "error", "Please Link a social account via Ayrshare"
return
post_result = self._create_post(
input_data,
[SocialPlatform.BLUESKY],
profile_key=profile_key.get_secret_value(),
)
if isinstance(post_result, PostError):
yield "error", post_result.message
return
yield "post_result", post_result
AYRSHARE_NODE_IDS = [
PostToBlueskyBlock().id,
PostToFacebookBlock().id,
PostToXBlock().id,
PostToLinkedInBlock().id,
PostToInstagramBlock().id,
PostToYouTubeBlock().id,
PostToRedditBlock().id,
PostToTelegramBlock().id,
PostToGMBBlock().id,
PostToPinterestBlock().id,
PostToTikTokBlock().id,
]

View File

@@ -1,12 +1,14 @@
import json
import logging
from enum import Enum
from io import BufferedReader
from typing import Any
from requests.exceptions import HTTPError, RequestException
from backend.data.block import Block, BlockCategory, BlockOutput, BlockSchema
from backend.data.model import SchemaField
from backend.util.file import MediaFileType, get_exec_file_path, store_media_file
from backend.util.request import requests
logger = logging.getLogger(name=__name__)
@@ -45,6 +47,10 @@ class SendWebRequestBlock(Block):
description="The body of the request",
default=None,
)
files: dict[str, MediaFileType] = SchemaField(
description="File fields mapping to MediaFileType for multipart upload",
default_factory=dict,
)
class Output(BlockSchema):
response: object = SchemaField(description="The response from the server")
@@ -61,7 +67,7 @@ class SendWebRequestBlock(Block):
output_schema=SendWebRequestBlock.Output,
)
def run(self, input_data: Input, **kwargs) -> BlockOutput:
def run(self, input_data: Input, *, graph_exec_id: str, **kwargs) -> BlockOutput:
body = input_data.body
if input_data.json_format:
@@ -74,11 +80,31 @@ class SendWebRequestBlock(Block):
# we should send it as plain text instead
input_data.json_format = False
# Prepare files for multipart upload using store_media_file
files: dict[str, BufferedReader] = {}
if input_data.files:
for field_name, media in input_data.files.items():
try:
rel_path = store_media_file(
graph_exec_id, media, return_content=False
)
abs_path = get_exec_file_path(graph_exec_id, rel_path)
files[field_name] = open(abs_path, "rb")
except Exception as e:
yield "error", f"Failed to prepare file '{field_name}': {e}"
for f in files.values():
try:
f.close()
except Exception:
pass
return
try:
response = requests.request(
input_data.method.value,
input_data.url,
headers=input_data.headers,
files=files if files else None,
json=body if input_data.json_format else None,
data=body if not input_data.json_format else None,
)
@@ -119,3 +145,11 @@ class SendWebRequestBlock(Block):
except Exception as e:
# Catch any other unexpected exceptions
yield "error", str(e)
finally:
# ensure cleanup of file handles
for f in files.values():
try:
f.close()
except Exception:
pass

View File

@@ -53,6 +53,7 @@ class BlockType(Enum):
WEBHOOK_MANUAL = "Webhook (manual)"
AGENT = "Agent"
AI = "AI"
AYRSHARE = "Ayrshare"
class BlockCategory(Enum):
@@ -76,6 +77,7 @@ class BlockCategory(Enum):
PRODUCTIVITY = "Block that helps with productivity"
ISSUE_TRACKING = "Block that helps with issue tracking"
MULTIMEDIA = "Block that interacts with multimedia content"
MARKETING = "Block that helps with marketing"
def dict(self) -> dict[str, str]:
return {"category": self.name, "description": self.value}

View File

@@ -15,7 +15,6 @@ from typing import (
Literal,
Optional,
Sequence,
TypedDict,
TypeVar,
get_args,
)
@@ -37,6 +36,7 @@ from pydantic_core import (
ValidationError,
core_schema,
)
from typing_extensions import TypedDict
from backend.integrations.providers import ProviderName
from backend.util.settings import Secrets
@@ -189,7 +189,7 @@ def SchemaField(
class _BaseCredentials(BaseModel):
id: str = Field(default_factory=lambda: str(uuid4()))
provider: str
title: Optional[str]
title: Optional[str] = None
@field_serializer("*")
def dump_secret_strings(value: Any, _info):
@@ -200,13 +200,13 @@ class _BaseCredentials(BaseModel):
class OAuth2Credentials(_BaseCredentials):
type: Literal["oauth2"] = "oauth2"
username: Optional[str]
username: Optional[str] = None
"""Username of the third-party service user that these credentials belong to"""
access_token: SecretStr
access_token_expires_at: Optional[int]
access_token_expires_at: Optional[int] = None
"""Unix timestamp (seconds) indicating when the access token expires (if at all)"""
refresh_token: Optional[SecretStr]
refresh_token_expires_at: Optional[int]
refresh_token: Optional[SecretStr] = None
refresh_token_expires_at: Optional[int] = None
"""Unix timestamp (seconds) indicating when the refresh token expires (if at all)"""
scopes: list[str]
metadata: dict[str, Any] = Field(default_factory=dict)
@@ -260,15 +260,32 @@ class OAuthState(BaseModel):
class UserMetadata(BaseModel):
integration_credentials: list[Credentials] = Field(default_factory=list)
"""⚠️ Deprecated; use `UserIntegrations.credentials` instead"""
integration_oauth_states: list[OAuthState] = Field(default_factory=list)
"""⚠️ Deprecated; use `UserIntegrations.oauth_states` instead"""
class UserMetadataRaw(TypedDict, total=False):
integration_credentials: list[dict]
"""⚠️ Deprecated; use `UserIntegrations.credentials` instead"""
integration_oauth_states: list[dict]
"""⚠️ Deprecated; use `UserIntegrations.oauth_states` instead"""
class UserIntegrations(BaseModel):
class ManagedCredentials(BaseModel):
"""Integration credentials managed by us, rather than by the user"""
ayrshare_profile_key: Optional[SecretStr] = None
@field_serializer("*")
def dump_secret_strings(value: Any, _info):
if isinstance(value, SecretStr):
return value.get_secret_value()
return value
managed_credentials: ManagedCredentials = Field(default_factory=ManagedCredentials)
credentials: list[Credentials] = Field(default_factory=list)
oauth_states: list[OAuthState] = Field(default_factory=list)

View File

@@ -124,7 +124,7 @@ async def get_user_integrations(user_id: str) -> UserIntegrations:
async def update_user_integrations(user_id: str, data: UserIntegrations):
encrypted_data = JSONCryptor().encrypt(data.model_dump())
encrypted_data = JSONCryptor().encrypt(data.model_dump(exclude_none=True))
await User.prisma().update(
where={"id": user_id},
data={"integrations": encrypted_data},

View File

@@ -38,6 +38,7 @@ from autogpt_libs.utils.cache import thread_cached
from prometheus_client import Gauge, start_http_server
from backend.blocks.agent import AgentExecutorBlock
from backend.blocks.ayrshare.post import AYRSHARE_NODE_IDS
from backend.data import redis
from backend.data.block import BlockData, BlockInput, BlockSchema, get_block
from backend.data.credit import UsageTransactionMetadata
@@ -217,6 +218,10 @@ def execute_node(
credentials, creds_lock = creds_manager.acquire(user_id, credentials_meta.id)
extra_exec_kwargs[field_name] = credentials
if node_block.id in AYRSHARE_NODE_IDS:
profile_key = creds_manager.store.get_ayrshare_profile_key(user_id)
extra_exec_kwargs["profile_key"] = profile_key
output_size = 0
try:
outputs: dict[str, Any] = {}

View File

@@ -1,6 +1,7 @@
import base64
import hashlib
import secrets
from contextlib import contextmanager
from datetime import datetime, timedelta, timezone
from typing import TYPE_CHECKING, Optional
@@ -177,6 +178,7 @@ zerobounce_credentials = APIKeyCredentials(
expires_at=None,
)
llama_api_credentials = APIKeyCredentials(
id="d44045af-1c33-4833-9e19-752313214de2",
provider="llama_api",
@@ -224,6 +226,8 @@ class IntegrationCredentialsStore:
return get_service_client(DatabaseManagerClient)
# =============== USER-MANAGED CREDENTIALS =============== #
def add_creds(self, user_id: str, credentials: Credentials) -> None:
with self.locked_user_integrations(user_id):
if self.get_creds_by_id(user_id, credentials.id):
@@ -282,6 +286,8 @@ class IntegrationCredentialsStore:
all_credentials.append(zerobounce_credentials)
if settings.secrets.google_maps_api_key:
all_credentials.append(google_maps_credentials)
if settings.secrets.llama_api_key:
all_credentials.append(llama_api_credentials)
return all_credentials
def get_creds_by_id(self, user_id: str, credentials_id: str) -> Credentials | None:
@@ -337,6 +343,19 @@ class IntegrationCredentialsStore:
]
self._set_user_integration_creds(user_id, filtered_credentials)
# ============== SYSTEM-MANAGED CREDENTIALS ============== #
def get_ayrshare_profile_key(self, user_id: str) -> SecretStr | None:
managed_user_creds = self._get_user_integrations(user_id).managed_credentials
return managed_user_creds.ayrshare_profile_key
def set_ayrshare_profile_key(self, user_id: str, profile_key: str) -> None:
_profile_key = SecretStr(profile_key)
with self.edit_user_integrations(user_id) as user_integrations:
user_integrations.managed_credentials.ayrshare_profile_key = _profile_key
# ===================== OAUTH STATES ===================== #
def store_state_token(
self, user_id: str, provider: str, scopes: list[str], use_pkce: bool = False
) -> tuple[str, str]:
@@ -353,16 +372,8 @@ class IntegrationCredentialsStore:
scopes=scopes,
)
with self.locked_user_integrations(user_id):
user_integrations = self._get_user_integrations(user_id)
oauth_states = user_integrations.oauth_states
oauth_states.append(state)
user_integrations.oauth_states = oauth_states
self.db_manager.update_user_integrations(
user_id=user_id, data=user_integrations
)
with self.edit_user_integrations(user_id) as user_integrations:
user_integrations.oauth_states.append(state)
return token, code_challenge
@@ -404,6 +415,17 @@ class IntegrationCredentialsStore:
return None
# =================== GET/SET HELPERS =================== #
@contextmanager
def edit_user_integrations(self, user_id: str):
with self.locked_user_integrations(user_id):
user_integrations = self._get_user_integrations(user_id)
yield user_integrations # yield to allow edits
self.db_manager.update_user_integrations(
user_id=user_id, data=user_integrations
)
def _set_user_integration_creds(
self, user_id: str, credentials: list[Credentials]
) -> None:

View File

@@ -1,11 +1,13 @@
import asyncio
import logging
from datetime import datetime, timedelta, timezone
from typing import TYPE_CHECKING, Annotated, Awaitable, Literal
from fastapi import APIRouter, Body, Depends, HTTPException, Path, Query, Request
from pydantic import BaseModel, Field
from pydantic import BaseModel, Field, SecretStr
from starlette.status import HTTP_404_NOT_FOUND
from backend.blocks.ayrshare._api import AyrshareClient, PostError, SocialPlatform
from backend.data.graph import set_node_webhook
from backend.data.integrations import (
WebhookEvent,
@@ -416,3 +418,72 @@ def _get_provider_oauth_handler(
client_secret=client_secret,
redirect_uri=f"{frontend_base_url}/auth/integrations/oauth_callback",
)
@router.get("/ayrshare/sso_url")
async def get_ayrshare_sso_url(
user_id: Annotated[str, Depends(get_user_id)],
) -> dict[str, str]:
"""
Generate an SSO URL for Ayrshare social media integration.
Returns:
dict: Contains the SSO URL for Ayrshare integration
"""
# Generate JWT and get SSO URL
client = AyrshareClient()
# Get or create profile key
profile_key = creds_manager.store.get_ayrshare_profile_key(user_id)
if not profile_key:
logger.info(f"Creating new Ayrshare profile for user {user_id}")
# Create new profile if none exists
profile = client.create_profile(title=f"User {user_id}", messaging_active=True)
if isinstance(profile, PostError):
logger.error(
f"Error creating Ayrshare profile for user {user_id}: {profile}"
)
raise HTTPException(
status_code=500, detail="Failed to create Ayrshare profile"
)
profile_key = profile.profileKey
creds_manager.store.set_ayrshare_profile_key(user_id, profile_key)
else:
logger.info(f"Using existing Ayrshare profile for user {user_id}")
# Convert SecretStr to string if needed
profile_key_str = (
profile_key.get_secret_value()
if isinstance(profile_key, SecretStr)
else str(profile_key)
)
private_key = settings.secrets.ayrshare_jwt_key
try:
logger.info(f"Generating JWT for user {user_id}")
jwt_response = client.generate_jwt(
private_key=private_key,
profile_key=profile_key_str,
allowed_social=[
SocialPlatform.FACEBOOK,
SocialPlatform.TWITTER,
SocialPlatform.LINKEDIN,
SocialPlatform.INSTAGRAM,
SocialPlatform.YOUTUBE,
SocialPlatform.REDDIT,
SocialPlatform.TELEGRAM,
SocialPlatform.GMB,
SocialPlatform.PINTEREST,
SocialPlatform.TIKTOK,
SocialPlatform.BLUESKY,
],
expires_in=2880,
verify=True,
)
except Exception as e:
logger.error(f"Error generating JWT for user {user_id}: {e}")
raise HTTPException(status_code=500, detail="Failed to generate JWT")
expire_at = datetime.now(timezone.utc) + timedelta(minutes=2880)
return {"sso_url": jwt_response.url, "expire_at": expire_at.isoformat()}

View File

@@ -82,6 +82,7 @@ async def test_get_library_agents(mocker):
@pytest.mark.asyncio(loop_scope="session")
@pytest.mark.skip(reason="Test not implemented yet")
async def test_add_agent_to_library(mocker):
await connect()
# Mock data

View File

@@ -438,7 +438,8 @@ class Secrets(UpdateTrackingModel["Secrets"], BaseSettings):
apollo_api_key: str = Field(default="", description="Apollo API Key")
smartlead_api_key: str = Field(default="", description="SmartLead API Key")
zerobounce_api_key: str = Field(default="", description="ZeroBounce API Key")
ayrshare_api_key: str = Field(default="", description="Ayrshare API Key")
ayrshare_jwt_key: str = Field(default="", description="Ayrshare private Key")
# Add more secret fields as needed
model_config = SettingsConfigDict(

View File

@@ -53,7 +53,7 @@ import {
CopyIcon,
ExitIcon,
} from "@radix-ui/react-icons";
import { FaKey } from "react-icons/fa";
import useCredits from "@/hooks/useCredits";
export type ConnectionData = Array<{
@@ -116,6 +116,8 @@ export const CustomNode = React.memo(
const flowContext = useContext(FlowContext);
const api = useBackendAPI();
const { formatCredits } = useCredits();
const [isLoading, setIsLoading] = useState(false);
let nodeFlowId = "";
if (data.uiType === BlockUIType.AGENT) {
@@ -249,6 +251,55 @@ export const CustomNode = React.memo(
return renderHandles(schema.properties);
};
const generateAyrshareSSOHandles = (
api: ReturnType<typeof useBackendAPI>,
) => {
const handleSSOLogin = async () => {
setIsLoading(true);
try {
const { sso_url } = await api.getAyrshareSSOUrl();
const popup = window.open(sso_url, "_blank", "popup=true");
if (!popup) {
throw new Error(
"Failed to open popup window. Please allow popups for this site.",
);
}
} catch (error) {
console.error("Error getting SSO URL:", error);
} finally {
setIsLoading(false);
}
};
return (
<div className="flex flex-col gap-2">
<Button
type="button"
variant="outline"
className="w-full"
onClick={handleSSOLogin}
disabled={isLoading}
>
{isLoading ? (
"Loading..."
) : (
<>
<FaKey className="mr-2 h-4 w-4" />
Connect Social Media Accounts
</>
)}
</Button>
<NodeHandle
title="SSO Token"
keyName="sso_token"
isConnected={false}
schema={{ type: "string" }}
side="right"
/>
</div>
);
};
const generateInputHandles = (
schema: BlockIORootSchema,
nodeType: BlockUIType,
@@ -826,8 +877,18 @@ export const CustomNode = React.memo(
(A Webhook URL will be generated when you save the agent)
</p>
))}
{data.inputSchema &&
generateInputHandles(data.inputSchema, data.uiType)}
{data.uiType === BlockUIType.AYRSHARE ? (
<>
{generateAyrshareSSOHandles(api)}
{generateInputHandles(
data.inputSchema,
BlockUIType.STANDARD,
)}
</>
) : (
data.inputSchema &&
generateInputHandles(data.inputSchema, data.uiType)
)}
</div>
</div>
) : (

View File

@@ -100,8 +100,9 @@ export default function CredentialsProvider({
}: {
children: React.ReactNode;
}) {
const [providers, setProviders] =
useState<CredentialsProvidersContextType | null>(null);
const [providers, setProviders] = useState<CredentialsProvidersContextType>(
{},
);
const api = useBackendAPI();
const addCredentials = useCallback(

View File

@@ -669,6 +669,10 @@ export default class BackendAPI {
await this._request("DELETE", `/library/presets/${presetId}`);
}
getAyrshareSSOUrl(): Promise<{ sso_url: string; expire_at: string }> {
return this._get("/integrations/ayrshare/sso_url");
}
executeLibraryAgentPreset(
presetId: string,
graphId: GraphID,

View File

@@ -580,6 +580,7 @@ export enum BlockUIType {
WEBHOOK_MANUAL = "Webhook (manual)",
AGENT = "Agent",
AI = "AI",
AYRSHARE = "Ayrshare",
}
export enum SpecialBlockID {