feat(nodes,ui): add processor events

This commit is contained in:
psychedelicious
2023-09-17 19:53:04 +10:00
parent 7a1fe7548b
commit cd9f0e026f
33 changed files with 213 additions and 117 deletions

View File

@@ -3,11 +3,11 @@ from typing import Optional
from fastapi import Body, Path, Query
from fastapi.routing import APIRouter
from invokeai.app.services.session_processor.session_processor_common import SessionProcessorStatusResult
from invokeai.app.services.session_queue.session_queue_common import ( # CancelByBatchIDsResult,
from invokeai.app.services.session_processor.session_processor_common import SessionProcessorStatus
from invokeai.app.services.session_queue.session_queue_common import (
QUEUE_ITEM_STATUS,
Batch,
BatchStatusResult,
BatchStatus,
CancelByBatchIDsResult,
ClearResult,
EnqueueBatchResult,
@@ -15,7 +15,7 @@ from invokeai.app.services.session_queue.session_queue_common import ( # Cancel
PruneResult,
SessionQueueItem,
SessionQueueItemDTO,
SessionQueueStatusResult,
SessionQueueStatus,
)
from invokeai.app.services.shared.models import CursorPaginatedResults
@@ -25,7 +25,7 @@ from ..dependencies import ApiDependencies
session_queue_router = APIRouter(prefix="/v1/queue", tags=["queue"])
class SessionQueueAndProcessorStatusResult(SessionQueueStatusResult, SessionProcessorStatusResult):
class SessionQueueAndProcessorStatusResult(SessionQueueStatus, SessionProcessorStatus):
"""The overall status of session queue and processor"""
pass
@@ -185,12 +185,12 @@ async def get_next_queue_item(
"/{queue_id}/status",
operation_id="get_queue_status",
responses={
200: {"model": SessionQueueStatusResult},
200: {"model": SessionQueueStatus},
},
)
async def get_queue_status(
queue_id: str = Path(description="The queue id to perform this operation on"),
) -> SessionQueueStatusResult:
) -> SessionQueueStatus:
"""Gets the status of the session queue"""
return ApiDependencies.invoker.services.session_queue.get_queue_status(queue_id)
@@ -199,12 +199,12 @@ async def get_queue_status(
"/{queue_id}/processor/status",
operation_id="get_processor_status",
responses={
200: {"model": SessionProcessorStatusResult},
200: {"model": SessionProcessorStatus},
},
)
async def get_processor_status(
queue_id: str = Path(description="The queue id to perform this operation on"),
) -> SessionProcessorStatusResult:
) -> SessionProcessorStatus:
"""Gets the status of the session queue"""
return ApiDependencies.invoker.services.session_processor.get_status()
@@ -213,13 +213,13 @@ async def get_processor_status(
"/{queue_id}/b/{batch_id}/status",
operation_id="get_batch_status",
responses={
200: {"model": BatchStatusResult},
200: {"model": BatchStatus},
},
)
async def get_batch_status(
queue_id: str = Path(description="The queue id to perform this operation on"),
batch_id: str = Path(description="The batch to get the status of"),
) -> BatchStatusResult:
) -> BatchStatus:
"""Gets the status of the session queue"""
return ApiDependencies.invoker.services.session_queue.get_batch_status(queue_id=queue_id, batch_id=batch_id)

View File

@@ -22,6 +22,10 @@ class SocketIO:
self.__sio.on("unsubscribe_queue", handler=self._handle_unsub_queue)
local_handler.register(event_name=EventServiceBase.queue_event, _func=self._handle_queue_event)
self.__sio.on("subscribe_processor", handler=self._handle_sub_processor)
self.__sio.on("unsubscribe_processor", handler=self._handle_unsub_processor)
local_handler.register(event_name=EventServiceBase.processor_event, _func=self._handle_processor_event)
async def _handle_session_event(self, event: Event):
await self.__sio.emit(
event=event[1]["event"],
@@ -51,3 +55,16 @@ class SocketIO:
async def _handle_unsub_queue(self, sid, data, *args, **kwargs):
if "queue_id" in data:
self.__sio.enter_room(sid, data["queue_id"])
async def _handle_processor_event(self, event: Event):
await self.__sio.emit(
event=event[1]["event"],
data=event[1]["data"],
room="processor",
)
async def _handle_sub_processor(self, sid, *args, **kwargs):
self.__sio.enter_room(sid, "processor")
async def _handle_unsub_processor(self, sid, *args, **kwargs):
self.__sio.enter_room(sid, "processor")

View File

@@ -4,6 +4,7 @@ from typing import Any, Optional
from invokeai.app.models.image import ProgressImage
from invokeai.app.services.model_manager_service import BaseModelType, ModelInfo, ModelType, SubModelType
from invokeai.app.services.session_processor.session_processor_common import SessionProcessorStatus
from invokeai.app.services.session_queue.session_queue_common import EnqueueBatchResult, SessionQueueItem
from invokeai.app.util.misc import get_timestamp
@@ -11,6 +12,7 @@ from invokeai.app.util.misc import get_timestamp
class EventServiceBase:
session_event: str = "session_event"
queue_event: str = "queue_event"
processor_event: str = "processor_event"
"""Basic event bus, to have an empty stand-in when not needed"""
@@ -26,13 +28,21 @@ class EventServiceBase:
)
def __emit_queue_event(self, event_name: str, payload: dict) -> None:
"""Queue events are emitted to a room with "queue" as the room name"""
"""Queue events are emitted to a room with queue_id as the room name"""
payload["timestamp"] = get_timestamp()
self.dispatch(
event_name=EventServiceBase.queue_event,
payload=dict(event=event_name, data=payload),
)
def __emit_processor_event(self, event_name: str, payload: dict) -> None:
"""Processor events are emitted to a room with "processor" as the room name"""
payload["timestamp"] = get_timestamp()
self.dispatch(
event_name=EventServiceBase.processor_event,
payload=dict(event=event_name, data=payload),
)
# Define events here for every event in the system.
# This will make them easier to integrate until we find a schema generator.
def emit_generator_progress(
@@ -233,3 +243,7 @@ class EventServiceBase:
event_name="queue_cleared",
payload=dict(queue_id=queue_id),
)
def emit_processor_status_changed(self, processor_status: SessionProcessorStatus) -> None:
"""Emitted when the queue is cleared"""
self.__emit_processor_event(event_name="processor_status_changed", payload=processor_status.dict())

View File

@@ -1,6 +1,6 @@
from abc import ABC, abstractmethod
from invokeai.app.services.session_processor.session_processor_common import SessionProcessorStatusResult
from invokeai.app.services.session_processor.session_processor_common import SessionProcessorStatus
class SessionProcessorBase(ABC):
@@ -23,6 +23,6 @@ class SessionProcessorBase(ABC):
pass
@abstractmethod
def get_status(self) -> SessionProcessorStatusResult:
def get_status(self) -> SessionProcessorStatus:
"""Gets the status of the session processor"""
pass

View File

@@ -1,7 +1,7 @@
from pydantic import BaseModel, Field
class SessionProcessorStatusResult(BaseModel):
class SessionProcessorStatus(BaseModel):
is_started: bool = Field(description="Whether the session processor is started")
is_processing: bool = Field(description="Whether a session is being processed")
is_stop_pending: bool = Field(description="Whether processor is pending stopping")

View File

@@ -11,7 +11,7 @@ from invokeai.app.services.session_queue.session_queue_common import SessionQueu
from ..invoker import Invoker
from .session_processor_base import SessionProcessorBase
from .session_processor_common import SessionProcessorStatusResult
from .session_processor_common import SessionProcessorStatus
POLLING_INTERVAL = 1
THREAD_LIMIT = 1
@@ -33,6 +33,7 @@ class DefaultSessionProcessor(SessionProcessorBase):
def stop(self, *args, **kwargs) -> None:
self.__stop_event.set()
self._emit_status_changed()
def _poll_now(self) -> None:
self.__poll_now_event.set()
@@ -48,6 +49,7 @@ class DefaultSessionProcessor(SessionProcessorBase):
),
)
self.__thread.start()
self._emit_status_changed()
async def _on_session_event(self, event: FastAPIEvent) -> None:
event_name = event[1]["event"]
@@ -81,8 +83,11 @@ class DefaultSessionProcessor(SessionProcessorBase):
def _is_stop_pending(self) -> bool:
return self.__stop_event.is_set()
def get_status(self) -> SessionProcessorStatusResult:
return SessionProcessorStatusResult(
def _emit_status_changed(self) -> None:
self.__invoker.services.events.emit_processor_status_changed(self.get_status())
def get_status(self) -> SessionProcessorStatus:
return SessionProcessorStatus(
is_started=self._is_started(),
is_processing=self._is_processing(),
is_stop_pending=self._is_stop_pending(),
@@ -92,10 +97,12 @@ class DefaultSessionProcessor(SessionProcessorBase):
if self._is_started():
return
self.__stop_event.clear()
self._emit_status_changed()
self._start_thread()
def pause(self) -> None:
self.__stop_event.set()
self._emit_status_changed()
def __process(
self,
@@ -117,7 +124,6 @@ class DefaultSessionProcessor(SessionProcessorBase):
# TODO: Why isn't the log level specified in dependencies.py working?
# Within the thread, it is always INFO and `logger.debug()` doesn't display.
# self.__invoker.services.logger.debug(f"Executing queue item {queue_item.item_id}")
print(f"Executing queue item {queue_item.item_id}")
self.__queue_item = queue_item
self.__invoker.services.graph_execution_manager.set(queue_item.session)
self.__invoker.invoke(queue_item.session, invoke_all=True)
@@ -125,7 +131,6 @@ class DefaultSessionProcessor(SessionProcessorBase):
if queue_item is None:
# self.__invoker.services.logger.debug("Waiting for next polling interval or event")
print("Waiting for next polling interval or event")
poll_now_event.wait(POLLING_INTERVAL)
continue
except Exception:
@@ -135,3 +140,4 @@ class DefaultSessionProcessor(SessionProcessorBase):
poll_now_event.clear()
self.__queue_item = None
self.__threadLimit.release()
self._emit_status_changed()

View File

@@ -5,7 +5,7 @@ from invokeai.app.services.graph import Graph
from invokeai.app.services.session_queue.session_queue_common import (
QUEUE_ITEM_STATUS,
Batch,
BatchStatusResult,
BatchStatus,
CancelByBatchIDsResult,
CancelByQueueIDResult,
ClearResult,
@@ -16,7 +16,7 @@ from invokeai.app.services.session_queue.session_queue_common import (
PruneResult,
SessionQueueItem,
SessionQueueItemDTO,
SessionQueueStatusResult,
SessionQueueStatus,
)
from invokeai.app.services.shared.models import CursorPaginatedResults
@@ -70,12 +70,12 @@ class SessionQueueBase(ABC):
pass
@abstractmethod
def get_queue_status(self, queue_id: str) -> SessionQueueStatusResult:
def get_queue_status(self, queue_id: str) -> SessionQueueStatus:
"""Gets the status of the queue"""
pass
@abstractmethod
def get_batch_status(self, queue_id: str, batch_id: str) -> BatchStatusResult:
def get_batch_status(self, queue_id: str, batch_id: str) -> BatchStatus:
"""Gets the status of a batch"""
pass

View File

@@ -230,7 +230,7 @@ class SessionQueueItem(SessionQueueItemWithoutGraph):
# region Query Results
class SessionQueueStatusResult(BaseModel):
class SessionQueueStatus(BaseModel):
queue_id: str = Field(..., description="The ID of the queue")
pending: int = Field(..., description="Number of queue items with status 'pending'")
in_progress: int = Field(..., description="Number of queue items with status 'in_progress'")
@@ -240,7 +240,7 @@ class SessionQueueStatusResult(BaseModel):
total: int = Field(..., description="Total number of queue items")
class BatchStatusResult(BaseModel):
class BatchStatus(BaseModel):
queue_id: str = Field(..., description="The ID of the queue")
batch_id: str = Field(..., description="The ID of the batch")
pending: int = Field(..., description="Number of queue items with status 'pending'")

View File

@@ -13,7 +13,7 @@ from invokeai.app.services.session_queue.session_queue_common import (
DEFAULT_QUEUE_ID,
QUEUE_ITEM_STATUS,
Batch,
BatchStatusResult,
BatchStatus,
CancelByBatchIDsResult,
CancelByQueueIDResult,
ClearResult,
@@ -25,7 +25,7 @@ from invokeai.app.services.session_queue.session_queue_common import (
SessionQueueItem,
SessionQueueItemDTO,
SessionQueueItemNotFoundError,
SessionQueueStatusResult,
SessionQueueStatus,
calc_session_count,
prepare_values_to_insert,
)
@@ -775,7 +775,7 @@ class SqliteSessionQueue(SessionQueueBase):
self.__lock.release()
return CursorPaginatedResults(items=items, limit=limit, has_more=has_more)
def get_queue_status(self, queue_id: str) -> SessionQueueStatusResult:
def get_queue_status(self, queue_id: str) -> SessionQueueStatus:
try:
self.__lock.acquire()
self.__cursor.execute(
@@ -796,7 +796,7 @@ class SqliteSessionQueue(SessionQueueBase):
finally:
self.__lock.release()
return SessionQueueStatusResult(
return SessionQueueStatus(
queue_id=queue_id,
pending=counts.get("pending", 0),
in_progress=counts.get("in_progress", 0),
@@ -806,7 +806,7 @@ class SqliteSessionQueue(SessionQueueBase):
total=total,
)
def get_batch_status(self, queue_id: str, batch_id: str) -> BatchStatusResult:
def get_batch_status(self, queue_id: str, batch_id: str) -> BatchStatus:
try:
self.__lock.acquire()
self.__cursor.execute(
@@ -829,7 +829,7 @@ class SqliteSessionQueue(SessionQueueBase):
finally:
self.__lock.release()
return BatchStatusResult(
return BatchStatus(
batch_id=batch_id,
queue_id=queue_id,
pending=counts.get("pending", 0),

View File

@@ -208,19 +208,19 @@
"queueTotal": "{{total}} Total",
"queueEmpty": "Queue Empty",
"enqueueing": "Queueing Sessions...",
"start": "Start",
"startTooltip": "Start Queue",
"startSucceeded": "Queue Started",
"startFailed": "Problem Starting Queue",
"stop": "Stop",
"stopTooltip": "Stop Queue After Current Item",
"stopRequested": "Stopping Queue After Current Item",
"stopSucceeded": "Queue Stopped",
"stopFailed": "Problem Stopping Queue",
"resume": "Resume",
"resumeTooltip": "Resume Processor",
"resumeSucceeded": "Processor Resumed",
"resumeFailed": "Problem Resuming Processor",
"pause": "Pause",
"pauseTooltip": "Pause Processor",
"pauseRequested": "Pausing Processor",
"pauseSucceeded": "Processor Paused",
"pauseFailed": "Problem Pausing Processor",
"cancel": "Cancel",
"cancelTooltip": "Cancel Current Item and Stop Queue",
"cancelSucceeded": "Queue Canceled",
"cancelFailed": "Problem Canceling Queue",
"cancelTooltip": "Cancel Current Item",
"cancelSucceeded": "Current Item Canceled",
"cancelFailed": "Problem Canceling Current Item",
"prune": "Prune",
"pruneTooltip": "Prune {{item_count}} Completed Items",
"pruneSucceeded": "Pruned {{item_count}} Completed Items from Queue",

View File

@@ -86,6 +86,7 @@ import { addEnqueueRequestedLinear } from './listeners/enqueueRequestedLinear';
import { addWorkflowLoadedListener } from './listeners/workflowLoaded';
import { addDynamicPromptsListener } from './listeners/promptChanged';
import { addSocketQueueItemStatusChangedEventListener } from './listeners/socketio/socketQueueItemStatusChanged';
import { addProcessorStatusChangedEventListener } from './listeners/socketio/socketProcessorStatusChanged';
export const listenerMiddleware = createListenerMiddleware();
@@ -174,6 +175,7 @@ addModelLoadEventListener();
addSessionRetrievalErrorEventListener();
addInvocationRetrievalErrorEventListener();
addSocketQueueItemStatusChangedEventListener();
addProcessorStatusChangedEventListener();
// Session Created
addSessionCreatedPendingListener();

View File

@@ -25,7 +25,7 @@ export const addBatchEnqueuedListener = () => {
dispatch(
queueApi.endpoints.resumeProcessor.initiate(undefined, {
fixedCacheKey: 'startQueue',
fixedCacheKey: 'resumeProcessor',
})
);

View File

@@ -48,7 +48,7 @@ export const addControlNetImageProcessedListener = () => {
req.reset();
dispatch(
queueApi.endpoints.resumeProcessor.initiate(undefined, {
fixedCacheKey: 'startQueue',
fixedCacheKey: 'resumeProcessor',
})
);
log.debug(

View File

@@ -141,7 +141,7 @@ export const addEnqueueRequestedCanvasListener = () => {
req.reset();
dispatch(
queueApi.endpoints.resumeProcessor.initiate(undefined, {
fixedCacheKey: 'startQueue',
fixedCacheKey: 'resumeProcessor',
})
);

View File

@@ -52,7 +52,7 @@ export const addEnqueueRequestedLinear = () => {
dispatch(
queueApi.endpoints.resumeProcessor.initiate(undefined, {
fixedCacheKey: 'startQueue',
fixedCacheKey: 'resumeProcessor',
})
);

View File

@@ -36,7 +36,7 @@ export const addEnqueueRequestedNodes = () => {
dispatch(
queueApi.endpoints.resumeProcessor.initiate(undefined, {
fixedCacheKey: 'startQueue',
fixedCacheKey: 'resumeProcessor',
})
);

View File

@@ -0,0 +1,19 @@
import { logger } from 'app/logging/logger';
import { queueApi } from 'services/api/endpoints/queue';
import {
appSocketProcessorStatusChanged,
socketProcessorStatusChanged,
} from 'services/events/actions';
import { startAppListening } from '../..';
export const addProcessorStatusChangedEventListener = () => {
startAppListening({
actionCreator: socketProcessorStatusChanged,
effect: (action, { dispatch }) => {
const log = logger('socketio');
log.debug(action.payload, 'Processor status changed');
dispatch(appSocketProcessorStatusChanged(action.payload));
dispatch(queueApi.util.invalidateTags(['SessionProcessorStatus']));
},
});
};

View File

@@ -39,7 +39,7 @@ export const addUpscaleRequestedListener = () => {
req.reset();
dispatch(
queueApi.endpoints.resumeProcessor.initiate(undefined, {
fixedCacheKey: 'startQueue',
fixedCacheKey: 'resumeProcessor',
})
);
log.debug(

View File

@@ -24,7 +24,7 @@ export const enqueueBatch = async (
dispatch(
queueApi.endpoints.resumeProcessor.initiate(undefined, {
fixedCacheKey: 'startQueue',
fixedCacheKey: 'resumeProcessor',
})
);

View File

@@ -28,14 +28,14 @@ const PauseProcessorButton = ({ asIconButton }: Props) => {
await pauseProcessor().unwrap();
dispatch(
addToast({
title: t('queue.stopRequested'),
title: t('queue.pauseRequested'),
status: 'info',
})
);
} catch {
dispatch(
addToast({
title: t('queue.stopFailed'),
title: t('queue.pauseFailed'),
status: 'error',
})
);
@@ -45,8 +45,8 @@ const PauseProcessorButton = ({ asIconButton }: Props) => {
return (
<QueueButton
asIconButton={asIconButton}
label={t('queue.stop')}
tooltip={t('queue.stopTooltip')}
label={t('queue.pause')}
tooltip={t('queue.pauseTooltip')}
isDisabled={!processorStatus?.is_started || isQueueMutationInProgress}
isLoading={processorStatus?.is_stop_pending}
icon={<FaStop />}

View File

@@ -9,6 +9,7 @@ import { useHotkeys } from 'react-hotkeys-hook';
import { useTranslation } from 'react-i18next';
import { useIsQueueMutationInProgress } from '../hooks/useIsQueueMutationInProgress';
import EnqueueButtonTooltip from './QueueButtonTooltip';
import { useIsQueueEmpty } from '../hooks/useIsQueueEmpty';
const QueueBackButton = () => {
const tabName = useAppSelector(activeTabNameSelector);
@@ -16,6 +17,7 @@ const QueueBackButton = () => {
const { isReady } = useIsReadyToEnqueue();
const dispatch = useAppDispatch();
const isQueueMutationInProgress = useIsQueueMutationInProgress();
const isEmpty = useIsQueueEmpty();
const handleEnqueue = useCallback(() => {
dispatch(clampSymmetrySteps());
@@ -41,7 +43,7 @@ const QueueBackButton = () => {
flexGrow={3}
minW={44}
>
{t('queue.queueBack')}
{isEmpty ? t('parameters.invoke.invoke') : t('queue.queueBack')}
</IAIButton>
);
};

View File

@@ -1,10 +1,10 @@
import { ButtonGroup, Flex, Spacer, Text } from '@chakra-ui/react';
import CancelCurrentQueueItemButton from 'features/queue/components/CancelCurrentQueueItemButton';
import ClearQueueButton from 'features/queue/components/ClearQueueButton';
import PauseProcessorButton from 'features/queue/components/PauseProcessorButton';
import QueueBackButton from 'features/queue/components/QueueBackButton';
import QueueFrontButton from 'features/queue/components/QueueFrontButton';
import ResumeProcessorButton from 'features/queue/components/StartQueueButton';
import PauseProcessorButton from 'features/queue/components/StopQueueButton';
import ResumeProcessorButton from 'features/queue/components/ResumeProcessorButton';
import ProgressBar from 'features/system/components/ProgressBar';
import { memo } from 'react';
import { useTranslation } from 'react-i18next';

View File

@@ -8,6 +8,7 @@ import { memo, useCallback } from 'react';
import { useHotkeys } from 'react-hotkeys-hook';
import { useTranslation } from 'react-i18next';
import { FaBoltLightning } from 'react-icons/fa6';
import { useIsQueueEmpty } from '../hooks/useIsQueueEmpty';
import { useIsQueueMutationInProgress } from '../hooks/useIsQueueMutationInProgress';
import EnqueueButtonTooltip from './QueueButtonTooltip';
@@ -17,6 +18,7 @@ const QueueFrontButton = () => {
const { isReady } = useIsReadyToEnqueue();
const { t } = useTranslation();
const isQueueMutationInProgress = useIsQueueMutationInProgress();
const isEmpty = useIsQueueEmpty();
const handleEnqueue = useCallback(() => {
dispatch(clampSymmetrySteps());
dispatch(enqueueRequested({ tabName, prepend: true }));
@@ -36,7 +38,7 @@ const QueueFrontButton = () => {
<IAIIconButton
colorScheme="base"
aria-label={t('queue.queueFront')}
isDisabled={!isReady || isQueueMutationInProgress}
isDisabled={!isReady || isQueueMutationInProgress || isEmpty}
onClick={handleEnqueue}
tooltip={<EnqueueButtonTooltip prepend />}
icon={<FaBoltLightning />}

View File

@@ -19,15 +19,7 @@ type Props = {
};
const QueueItemComponent = ({ queueItemDTO }: Props) => {
const {
batch_id,
completed_at,
error,
item_id,
session_id,
started_at,
status,
} = queueItemDTO;
const { session_id, batch_id, item_id } = queueItemDTO;
const { t } = useTranslation();
const isQueueMutationInProgress = useIsQueueMutationInProgress();
const [cancelQueueItem, { isLoading: isLoadingCancelQueueItem }] =
@@ -64,13 +56,17 @@ const QueueItemComponent = ({ queueItemDTO }: Props) => {
);
const { data: queueItem } = useGetQueueItemQuery(item_id);
const executionTime = useMemo(() => {
if (!completed_at || !started_at) {
if (!queueItem?.completed_at || !queueItem?.started_at) {
return 'n/a';
}
return String(
((Date.parse(completed_at) - Date.parse(started_at)) / 1000).toFixed(2)
(
(Date.parse(queueItem.completed_at) -
Date.parse(queueItem.started_at)) /
1000
).toFixed(2)
);
}, [completed_at, started_at]);
}, [queueItem?.completed_at, queueItem?.started_at]);
return (
<Flex
@@ -97,7 +93,11 @@ const QueueItemComponent = ({ queueItemDTO }: Props) => {
<IAIButton
onClick={handleCancelQueueItem}
isLoading={isLoadingCancelQueueItem}
isDisabled={['canceled', 'completed', 'failed'].includes(status)}
isDisabled={
queueItem
? ['canceled', 'completed', 'failed'].includes(queueItem.status)
: true
}
aria-label={t('queue.cancelItem')}
icon={<FaTimes />}
colorScheme="error"
@@ -116,7 +116,7 @@ const QueueItemComponent = ({ queueItemDTO }: Props) => {
</IAIButton>
</ButtonGroup>
</Flex>
{error && (
{queueItem?.error && (
<Flex
layerStyle="second"
p={3}
@@ -129,7 +129,7 @@ const QueueItemComponent = ({ queueItemDTO }: Props) => {
<Heading size="sm" color="error.500" _dark={{ color: 'error.400' }}>
Error
</Heading>
<pre>{error}</pre>
<pre>{queueItem.error}</pre>
</Flex>
)}
<Flex

View File

@@ -28,14 +28,14 @@ const ResumeProcessorButton = ({ asIconButton }: Props) => {
await resumeProcessor().unwrap();
dispatch(
addToast({
title: t('queue.startSucceeded'),
title: t('queue.resumeSucceeded'),
status: 'success',
})
);
} catch {
dispatch(
addToast({
title: t('queue.startFailed'),
title: t('queue.resumeFailed'),
status: 'error',
})
);
@@ -45,8 +45,8 @@ const ResumeProcessorButton = ({ asIconButton }: Props) => {
return (
<QueueButton
asIconButton={asIconButton}
label={t('queue.start')}
tooltip={t('queue.startTooltip')}
label={t('queue.resume')}
tooltip={t('queue.resumeTooltip')}
isDisabled={
processorStatus?.is_started ||
processorStatus?.is_processing ||

View File

@@ -3,8 +3,8 @@ import { memo } from 'react';
import CancelCurrentQueueItemButton from './CancelCurrentQueueItemButton';
import ClearQueueButton from './ClearQueueButton';
import PruneQueueButton from './PruneQueueButton';
import ResumeProcessorButton from './StartQueueButton';
import PauseProcessorButton from './StopQueueButton';
import ResumeProcessorButton from './ResumeProcessorButton';
import PauseProcessorButton from './PauseProcessorButton';
type Props = ButtonGroupProps & {
asIconButtons?: boolean;

View File

@@ -0,0 +1,13 @@
import { useGetQueueStatusQuery } from 'services/api/endpoints/queue';
export const useIsQueueEmpty = () => {
const { isEmpty } = useGetQueueStatusQuery(undefined, {
selectFromResult: ({ data }) => {
if (!data) {
return { isEmpty: true };
}
return { isEmpty: data.in_progress === 0 && data.pending === 0 };
},
});
return isEmpty;
};

View File

@@ -4,12 +4,14 @@ export interface QueueState {
listCursor: number | undefined;
listPriority: number | undefined;
selectedQueueItem: string | undefined;
resumeProcessorOnEnqueue: boolean;
}
export const initialQueueState: QueueState = {
listCursor: undefined,
listPriority: undefined,
selectedQueueItem: undefined,
resumeProcessorOnEnqueue: true,
};
const initialState: QueueState = initialQueueState;
@@ -38,6 +40,12 @@ export const queueSlice = createSlice({
state.selectedQueueItem = action.payload;
}
},
resumeProcessorOnEnqueueChanged: (
state,
action: PayloadAction<boolean>
) => {
state.resumeProcessorOnEnqueue = action.payload;
},
},
});
@@ -46,6 +54,7 @@ export const {
listPriorityChanged,
listParamsReset,
queueItemSelectionToggled,
resumeProcessorOnEnqueueChanged,
} = queueSlice.actions;
export default queueSlice.reducer;

View File

@@ -19,7 +19,7 @@ import {
appSocketInvocationError,
appSocketInvocationRetrievalError,
appSocketInvocationStarted,
appSocketQueueStatusChanged,
appSocketProcessorStatusChanged,
appSocketSessionRetrievalError,
} from 'services/events/actions';
import { ProgressImage } from 'services/events/types';
@@ -196,9 +196,9 @@ export const systemSlice = createSlice({
},
},
extraReducers(builder) {
builder.addCase(appSocketQueueStatusChanged, (state, action) => {
const { started, stop_after_current } = action.payload.data;
if (!started && !stop_after_current) {
builder.addCase(appSocketProcessorStatusChanged, (state, action) => {
const { is_started, is_stop_pending } = action.payload.data;
if (!is_started && !is_stop_pending) {
state.progressImage = null;
}
});

View File

@@ -556,8 +556,8 @@ export type components = {
*/
items?: (string | number)[];
};
/** BatchStatusResult */
BatchStatusResult: {
/** BatchStatus */
BatchStatus: {
/**
* Queue Id
* @description The ID of the queue
@@ -1177,6 +1177,11 @@ export type components = {
* @description The workflow to save with the image
*/
workflow?: string;
/**
* CLIP
* @description CLIP (tokenizer, text encoder, LoRAs) and skipped layer count
*/
clip?: components["schemas"]["ClipField"];
/**
* Skipped Layers
* @description Number of layers to skip in text encoder
@@ -1189,11 +1194,6 @@ export type components = {
* @enum {string}
*/
type: "clip_skip";
/**
* CLIP
* @description CLIP (tokenizer, text encoder, LoRAs) and skipped layer count
*/
clip?: components["schemas"]["ClipField"];
};
/**
* ClipSkipInvocationOutput
@@ -6948,8 +6948,8 @@ export type components = {
*/
type: "segment_anything_processor";
};
/** SessionProcessorStatusResult */
SessionProcessorStatusResult: {
/** SessionProcessorStatus */
SessionProcessorStatus: {
/**
* Is Started
* @description Whether the session processor is started
@@ -7119,8 +7119,8 @@ export type components = {
*/
completed_at?: string;
};
/** SessionQueueStatusResult */
SessionQueueStatusResult: {
/** SessionQueueStatus */
SessionQueueStatus: {
/**
* Queue Id
* @description The ID of the queue
@@ -8149,18 +8149,18 @@ export type components = {
/** Ui Order */
ui_order?: number;
};
/**
* StableDiffusion2ModelFormat
* @description An enumeration.
* @enum {string}
*/
StableDiffusion2ModelFormat: "checkpoint" | "diffusers";
/**
* StableDiffusionOnnxModelFormat
* @description An enumeration.
* @enum {string}
*/
StableDiffusionOnnxModelFormat: "olive" | "onnx";
/**
* StableDiffusion2ModelFormat
* @description An enumeration.
* @enum {string}
*/
StableDiffusion2ModelFormat: "checkpoint" | "diffusers";
/**
* ControlNetModelFormat
* @description An enumeration.
@@ -9894,7 +9894,7 @@ export type operations = {
/** @description Successful Response */
200: {
content: {
"application/json": components["schemas"]["SessionQueueStatusResult"];
"application/json": components["schemas"]["SessionQueueStatus"];
};
};
/** @description Validation Error */
@@ -9920,7 +9920,7 @@ export type operations = {
/** @description Successful Response */
200: {
content: {
"application/json": components["schemas"]["SessionProcessorStatusResult"];
"application/json": components["schemas"]["SessionProcessorStatus"];
};
};
/** @description Validation Error */
@@ -9948,7 +9948,7 @@ export type operations = {
/** @description Successful Response */
200: {
content: {
"application/json": components["schemas"]["BatchStatusResult"];
"application/json": components["schemas"]["BatchStatus"];
};
};
/** @description Validation Error */

View File

@@ -8,8 +8,8 @@ import {
InvocationStartedEvent,
ModelLoadCompletedEvent,
ModelLoadStartedEvent,
ProcessorStatusChangedEvent,
QueueItemStatusChangedEvent,
QueueStatusChangedEvent,
SessionRetrievalErrorEvent,
} from 'services/events/types';
@@ -235,17 +235,17 @@ export const appSocketQueueItemStatusChanged = createAction<{
}>('socket/appSocketQueueItemStatusChanged');
/**
* Socket.IO Queue Status Changed
* Socket.IO Processor Status Changed
*
* Do not use. Only for use in middleware.
*/
export const socketQueueStatusChanged = createAction<{
data: QueueStatusChangedEvent;
}>('socket/socketQueueStatusChanged');
export const socketProcessorStatusChanged = createAction<{
data: ProcessorStatusChangedEvent;
}>('socket/socketProcessorStatusChanged');
/**
* App-level Queue Status Changed
* App-level Processor Status Changed
*/
export const appSocketQueueStatusChanged = createAction<{
data: QueueStatusChangedEvent;
}>('socket/appSocketQueueStatusChanged');
export const appSocketProcessorStatusChanged = createAction<{
data: ProcessorStatusChangedEvent;
}>('socket/appSocketProcessorStatusChanged');

View File

@@ -1,3 +1,4 @@
import { components } from 'services/api/schema';
import { O } from 'ts-toolbelt';
import {
BaseModelType,
@@ -6,7 +7,6 @@ import {
ModelType,
SubModelType,
} from '../api/types';
import { components } from 'services/api/schema';
/**
* A progress image, we get one for each step in the generation
@@ -147,14 +147,16 @@ export type QueueItemStatusChangedEvent = {
batch_id: string;
status: components['schemas']['SessionQueueItemDTO']['status'];
};
/**
* A `queue_status_changed` socket.io event.
*
* @example socket.on('queue_status_changed', (data: QueueItemStatusChangedEvent) => { ... }
*/
export type QueueStatusChangedEvent = {
started: boolean;
stop_after_current: boolean;
export type ProcessorStatusChangedEvent = {
is_started: boolean;
is_processing: boolean;
is_stop_pending: boolean;
};
export type ClientEmitSubscribeSession = {
@@ -186,6 +188,7 @@ export type ServerToClientEvents = {
session_retrieval_error: (payload: SessionRetrievalErrorEvent) => void;
invocation_retrieval_error: (payload: InvocationRetrievalErrorEvent) => void;
queue_item_status_changed: (payload: QueueItemStatusChangedEvent) => void;
processor_status_changed: (payload: ProcessorStatusChangedEvent) => void;
};
export type ClientToServerEvents = {
@@ -195,4 +198,6 @@ export type ClientToServerEvents = {
unsubscribe_session: (payload: ClientEmitUnsubscribeSession) => void;
subscribe_queue: (payload: ClientEmitSubscribeQueue) => void;
unsubscribe_queue: (payload: ClientEmitUnsubscribeQueue) => void;
subscribe_processor: () => void;
unsubscribe_processor: () => void;
};

View File

@@ -16,6 +16,7 @@ import {
socketInvocationStarted,
socketModelLoadCompleted,
socketModelLoadStarted,
socketProcessorStatusChanged,
socketQueueItemStatusChanged,
socketSessionRetrievalError,
} from '../actions';
@@ -40,6 +41,8 @@ export const setEventListeners = (arg: SetEventListenersArg) => {
dispatch(socketConnected());
const queue_id = $queueId.get();
socket.emit('subscribe_queue', { queue_id });
socket.emit('subscribe_processor');
});
socket.on('connect_error', (error) => {
@@ -169,4 +172,8 @@ export const setEventListeners = (arg: SetEventListenersArg) => {
}
dispatch(socketQueueItemStatusChanged({ data }));
});
socket.on('processor_status_changed', (data) => {
dispatch(socketProcessorStatusChanged({ data }));
});
};