|
from g4f.Provider import __all__, ProviderUtils |
|
from g4f import ChatCompletion |
|
import concurrent.futures |
|
|
|
_ = [ |
|
'BaseProvider', |
|
'AsyncProvider', |
|
'AsyncGeneratorProvider', |
|
'RetryProvider' |
|
] |
|
|
|
def test_provider(provider): |
|
try: |
|
provider = (ProviderUtils.convert[provider]) |
|
if provider.working and not provider.needs_auth: |
|
print('testing', provider.__name__) |
|
completion = ChatCompletion.create(model='gpt-3.5-turbo', |
|
messages=[{"role": "user", "content": "hello"}], provider=provider) |
|
return completion, provider.__name__ |
|
except Exception as e: |
|
|
|
return None |
|
|
|
with concurrent.futures.ThreadPoolExecutor() as executor: |
|
futures = [ |
|
executor.submit(test_provider, provider) |
|
for provider in __all__ |
|
if provider not in _ |
|
] |
|
for future in concurrent.futures.as_completed(futures): |
|
if result := future.result(): |
|
print(f'{result[1]} | {result[0]}') |