test_xinference_provider.py 4.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131
  1. import pytest
  2. from unittest.mock import patch, MagicMock
  3. import json
  4. from core.model_providers.models.entity.model_params import ModelType
  5. from core.model_providers.providers.base import CredentialsValidateFailedError
  6. from core.model_providers.providers.xinference_provider import XinferenceProvider
  7. from models.provider import ProviderType, Provider, ProviderModel
  8. PROVIDER_NAME = 'xinference'
  9. MODEL_PROVIDER_CLASS = XinferenceProvider
  10. VALIDATE_CREDENTIAL = {
  11. 'model_uid': 'fake-model-uid',
  12. 'server_url': 'http://127.0.0.1:9997/'
  13. }
  14. def encrypt_side_effect(tenant_id, encrypt_key):
  15. return f'encrypted_{encrypt_key}'
  16. def decrypt_side_effect(tenant_id, encrypted_key):
  17. return encrypted_key.replace('encrypted_', '')
  18. def test_is_credentials_valid_or_raise_valid(mocker):
  19. mocker.patch('core.third_party.langchain.llms.xinference_llm.XinferenceLLM._call',
  20. return_value="abc")
  21. MODEL_PROVIDER_CLASS.is_model_credentials_valid_or_raise(
  22. model_name='username/test_model_name',
  23. model_type=ModelType.TEXT_GENERATION,
  24. credentials=VALIDATE_CREDENTIAL.copy()
  25. )
  26. def test_is_credentials_valid_or_raise_invalid():
  27. # raise CredentialsValidateFailedError if replicate_api_token is not in credentials
  28. with pytest.raises(CredentialsValidateFailedError):
  29. MODEL_PROVIDER_CLASS.is_model_credentials_valid_or_raise(
  30. model_name='test_model_name',
  31. model_type=ModelType.TEXT_GENERATION,
  32. credentials={}
  33. )
  34. # raise CredentialsValidateFailedError if replicate_api_token is invalid
  35. with pytest.raises(CredentialsValidateFailedError):
  36. MODEL_PROVIDER_CLASS.is_model_credentials_valid_or_raise(
  37. model_name='test_model_name',
  38. model_type=ModelType.TEXT_GENERATION,
  39. credentials={'server_url': 'invalid'})
  40. @patch('core.helper.encrypter.encrypt_token', side_effect=encrypt_side_effect)
  41. def test_encrypt_model_credentials(mock_encrypt, mocker):
  42. api_key = 'http://127.0.0.1:9997/'
  43. mocker.patch('core.model_providers.providers.xinference_provider.XinferenceProvider._get_extra_credentials',
  44. return_value={
  45. 'model_handle_type': 'generate',
  46. 'model_format': 'ggmlv3'
  47. })
  48. result = MODEL_PROVIDER_CLASS.encrypt_model_credentials(
  49. tenant_id='tenant_id',
  50. model_name='test_model_name',
  51. model_type=ModelType.TEXT_GENERATION,
  52. credentials=VALIDATE_CREDENTIAL.copy()
  53. )
  54. mock_encrypt.assert_called_with('tenant_id', api_key)
  55. assert result['server_url'] == f'encrypted_{api_key}'
  56. @patch('core.helper.encrypter.decrypt_token', side_effect=decrypt_side_effect)
  57. def test_get_model_credentials_custom(mock_decrypt, mocker):
  58. provider = Provider(
  59. id='provider_id',
  60. tenant_id='tenant_id',
  61. provider_name=PROVIDER_NAME,
  62. provider_type=ProviderType.CUSTOM.value,
  63. encrypted_config=None,
  64. is_valid=True,
  65. )
  66. encrypted_credential = VALIDATE_CREDENTIAL.copy()
  67. encrypted_credential['server_url'] = 'encrypted_' + encrypted_credential['server_url']
  68. mock_query = MagicMock()
  69. mock_query.filter.return_value.first.return_value = ProviderModel(
  70. encrypted_config=json.dumps(encrypted_credential)
  71. )
  72. mocker.patch('extensions.ext_database.db.session.query', return_value=mock_query)
  73. model_provider = MODEL_PROVIDER_CLASS(provider=provider)
  74. result = model_provider.get_model_credentials(
  75. model_name='test_model_name',
  76. model_type=ModelType.TEXT_GENERATION
  77. )
  78. assert result['server_url'] == 'http://127.0.0.1:9997/'
  79. @patch('core.helper.encrypter.decrypt_token', side_effect=decrypt_side_effect)
  80. def test_get_model_credentials_obfuscated(mock_decrypt, mocker):
  81. provider = Provider(
  82. id='provider_id',
  83. tenant_id='tenant_id',
  84. provider_name=PROVIDER_NAME,
  85. provider_type=ProviderType.CUSTOM.value,
  86. encrypted_config=None,
  87. is_valid=True,
  88. )
  89. encrypted_credential = VALIDATE_CREDENTIAL.copy()
  90. encrypted_credential['server_url'] = 'encrypted_' + encrypted_credential['server_url']
  91. mock_query = MagicMock()
  92. mock_query.filter.return_value.first.return_value = ProviderModel(
  93. encrypted_config=json.dumps(encrypted_credential)
  94. )
  95. mocker.patch('extensions.ext_database.db.session.query', return_value=mock_query)
  96. model_provider = MODEL_PROVIDER_CLASS(provider=provider)
  97. result = model_provider.get_model_credentials(
  98. model_name='test_model_name',
  99. model_type=ModelType.TEXT_GENERATION,
  100. obfuscated=True
  101. )
  102. middle_token = result['server_url'][6:-2]
  103. assert len(middle_token) == max(len(VALIDATE_CREDENTIAL['server_url']) - 8, 0)
  104. assert all(char == '*' for char in middle_token)