使用langchain异步获取网络信息
前言
从来没想过在接入通义千问的时候还会遇到NotImplementedError
。实在难以理解,处理过后才明白问题。现在总结后给出结果。
发现问题
我们来看个例子。就比如这段代码:
摘自ranying666/langchain_tongyi_practical中的5-langchain-search-document_loaders.py
1 | loader = AsyncChromiumLoader(["https://dataea.cn/okr-keyresult-checklist/"]) |
如果把这段直接接入streamlit
,那就直接报错:
NotImplementedError
比较离谱的是,只有这么一个错误,没有其他信息。
分析问题
很难理解为什么是这个错。
随着排查的进行,发现问题好像出在AsyncChromiumLoader
中。
AsyncChromiumLoader
继承自BaseLoader
,而BaseLoader
的load
方法中,调用的却是lazy_load
。先不用看这个方法的具体内容,就看这个方法的名字,你大概也能猜出来什么问题了:
懒加载带来的未能实例化的问题。
简单地说,就是:streamlit
已在前面飞,loader.load()
还在后面追。终于,追不上了,就爆炸了。而刚好的是,lazy_load
中抛出的异常就是这个NotImplementedError
。
众所周知,streamlit
构建网页的过程是单线程的。
所以,当我们需要请求内容的时候,使用异步请求的AsyncChromiumLoader
就会出现这种问题。
那么,该怎么办呢?
阻塞?
怎么办呢?阻塞,对吧?很容易想到。
于是会想当然的这么用:
1 | loader = AsyncChromiumLoader(["https://dataea.cn/okr-keyresult-checklist/"]) |
看着很直观,检测html
是否有返回值。
如果真这么简单的话我也不会把它写在这里(🤷♂️)。
结果就是,还是报错。这又是为什么呢?逻辑没问题呀?
逻辑是没问题,问题出在底层。作为一个异步函数,他怎么可能没有返回值呢?
我们来回顾一下load
方法:
1 | # Sub-classes should not implement this method directly. Instead, they should implement the lazy load method. |
那么,lazy_load
是怎么回事呢?
1 | def lazy_load(self) -> Iterator[Document]: |
这里比较有意思的是,对实例化的AsyncChromiumLoader
对象(就是这个self
),判断AsyncChromiumLoader.load
与BaseLoader.load
是否一致。
其实这里比较的是地址信息,因为子类如果重写了这个load
方法,那么地址就会被改变。如果不一致的话,就会返回一个迭代器,这个迭代器就是为了后续过程中无论返回的是否是list
,都能够迭代。
听起来没问题。
但是,懒加载呢?
1 | async def aload(self) -> List[Document]: |
比较神奇的就出现在这里了。alazy_load
方法最终给出来的就是一个Document
类的迭代器,然后最终通过yield
给到调用方,直到doc
在迭代过程中达到了done
。
但是呢,doc
变量的结果是await run_in_executor(None, next, iterator, done)
,即使run_in_executor
返回的是一个迭代器对象,最终由await
进行处理,所以是有返回值的,但是返回的是未来需要返回的,是asyncio.futures.Future
类。这一点完全可以类比Java
中的Future
对象。
所以,最终而言,AsyncChromiumLoader.load
并不是直到结束才返回值,而是在执行的过程中不断地通过yield
给出返回值,只是在await
最终处理为AsyncIterator[Document]
类型。
阻塞!
为了让streamlit
等待异步请求,就需要主线程停下来,直到请求结束了才能继续执行。
那这回该怎么办呢?直接用asyncio
与playwright
给阻塞掉。
首先,我们需要利用asyncio
创建一个阻塞事件,并设置所有的事件都需要在阻塞事件结束后执行。
其次,在执行这个阻塞之间的时候,我们依然使用异步请求,只不过是所有的事件都在等我们。
于是,可以给出代码如下:
1 | import asyncio |
其实这里面最核心的就是asyncio
下的run_until_complete
函数:
1 | def run_until_complete(self, future): |
这个函数最大的特点就是会创建一个任务并执行。直到任务执行完成或者报错中断之前,其他所有任务都得等着这个任务的回调函数。
于是,这个函数就阻塞了streamlit
的进程,直到异步任务完成。
附:用什么阻塞streamlit进程
其实这段文字本来应该接在上面这段的。但是这个坑实在太神奇了,单独拉出来说明。
这里面还有一个很神奇的坑:用什么东西阻塞。
就像上面这段代码,针对Windows
、Linux
、Darwin
,分别采用了ProactorEventLoop
、new_event_loop
、SelectorEventLoop
阻塞streamlit
进程。
如果在Linux
平台中使用ProactorEventLoop
,那么streamlit
进程依然不会阻塞,因为他们都只能在各自的操作系统中起作用。