from langgraph.checkpoint.memory import InMemorySaverfrom langgraph.func import entrypoint, taskfrom langgraph.types import interrupt@taskdef write_essay(topic: str) -> str: """Write an essay about the given topic.""" time.sleep(1) # A placeholder for a long-running task. return f"An essay about topic: {topic}"@entrypoint(checkpointer=InMemorySaver())def workflow(topic: str) -> dict: """A simple workflow that writes an essay and asks for a review.""" essay = write_essay("cat").result() is_approved = interrupt({ # Any json-serializable payload provided to interrupt as argument. # It will be surfaced on the client side as an Interrupt when streaming data # from the workflow. "essay": essay, # The essay we want reviewed. # We can add any additional information that we need. # For example, introduce a key called "action" with some instructions. "action": "Please approve/reject the essay", }) return { "essay": essay, # The essay that was generated "is_approved": is_approved, # Response from HIL }
import timeimport uuidfrom langgraph.func import entrypoint, taskfrom langgraph.types import interruptfrom langgraph.checkpoint.memory import InMemorySaver@taskdef write_essay(topic: str) -> str: """Write an essay about the given topic.""" time.sleep(1) # This is a placeholder for a long-running task. return f"An essay about topic: {topic}"@entrypoint(checkpointer=InMemorySaver())def workflow(topic: str) -> dict: """A simple workflow that writes an essay and asks for a review.""" essay = write_essay("cat").result() is_approved = interrupt( { # Any json-serializable payload provided to interrupt as argument. # It will be surfaced on the client side as an Interrupt when streaming data # from the workflow. "essay": essay, # The essay we want reviewed. # We can add any additional information that we need. # For example, introduce a key called "action" with some instructions. "action": "Please approve/reject the essay", } ) return { "essay": essay, # The essay that was generated "is_approved": is_approved, # Response from HIL }thread_id = str(uuid.uuid4())config = {"configurable": {"thread_id": thread_id}}for item in workflow.stream("cat", config): print(item)# > {'write_essay': 'An essay about topic: cat'}# > {# > '__interrupt__': (# > Interrupt(# > value={# > 'essay': 'An essay about topic: cat',# > 'action': 'Please approve/reject the essay'# > },# > id='b9b2b9d788f482663ced6dc755c9e981'# > ),# > )# > }
一篇论文已经撰写完毕,准备进行审查。一旦审查完成,我们就可以继续工作流程:
from langgraph.types import Command# Get review from a user (e.g., via a UI)# In this case, we're using a bool, but this can be any json-serializable value.human_review = Truefor item in workflow.stream(Command(resume=human_review), config): print(item)
{'workflow': {'essay': 'An essay about topic: cat', 'is_approved': False}}
from langgraph.func import entrypoint@entrypoint(checkpointer=checkpointer)def my_workflow(some_input: dict) -> int: # some logic that may involve long-running tasks like API calls, # and may be interrupted for human-in-the-loop. ... return result
from langgraph.func import entrypoint@entrypoint(checkpointer=checkpointer)async def my_workflow(some_input: dict) -> int: # some logic that may involve long-running tasks like API calls, # and may be interrupted for human-in-the-loop ... return result
from langchain_core.runnables import RunnableConfigfrom langgraph.func import entrypointfrom langgraph.store.base import BaseStorefrom langgraph.store.memory import InMemoryStorein_memory_store = InMemoryStore(...) # An instance of InMemoryStore for long-term memory@entrypoint( checkpointer=checkpointer, # Specify the checkpointer store=in_memory_store # Specify the store)def my_workflow( some_input: dict, # The input (e.g., passed via `invoke`) *, previous: Any = None, # For short-term memory store: BaseStore, # For long-term memory writer: StreamWriter, # For streaming custom data config: RunnableConfig # For accessing the configuration passed to the entrypoint) -> ...:
@entrypoint(checkpointer=checkpointer)def my_workflow(number: int, *, previous: Any = None) -> entrypoint.final[int, int]: previous = previous or 0 # This will return the previous value to the caller, saving # 2 * number to the checkpoint, which will be used in the next invocation # for the `previous` parameter. return entrypoint.final(value=previous, save=2 * number)config = { "configurable": { "thread_id": "1" }}my_workflow.invoke(3, config) # 0 (previous was None)my_workflow.invoke(1, config) # 6 (previous was 3 * 2 from the previous invocation)
@entrypoint(checkpointer=checkpointer)def my_workflow(inputs: dict) -> int: # This code will be executed a second time when resuming the workflow. # Which is likely not what you want. with open("output.txt", "w") as f: f.write("Side effect executed") value = interrupt("question") return value
在这个例子中,副作用被封装在一个任务中,确保在恢复执行时保持一致性。
from langgraph.func import task@taskdef write_to_file(): with open("output.txt", "w") as f: f.write("Side effect executed")@entrypoint(checkpointer=checkpointer)def my_workflow(inputs: dict) -> int: # The side effect is now encapsulated in a task. write_to_file().result() value = interrupt("question") return value