Use the Streaming Feature of the Kimi API
When the Kimi large language model receives a question from a user, it first performs inference and then generates the response one Token at a time. In the examples from our first two chapters, we chose to wait for the Kimi large language model to generate all Tokens before printing its response. This usually takes several seconds. If your question is complex enough and the response from the Kimi large language model is long enough, the time to wait for the complete response can be stretched to 10 or even 20 seconds, which greatly reduces the user experience. To improve this situation and provide timely feedback to users, we offer the ability to stream output, known as Streaming. We will explain the principles of Streaming and illustrate it with actual code:
- How to use streaming output;
- Common issues when using streaming output;
- How to handle streaming output without using the Python SDK;
How to Use Streaming Output
Streaming, in a nutshell, means that whenever the Kimi large language model generates a certain number of Tokens (usually 1 Token), it immediately sends these Tokens to the client, instead of waiting for all Tokens to be generated before sending them to the client. When you chat with Kimi AI Assistant (opens in a new tab), the assistant's response appears character by character, which is one manifestation of streaming output. Streaming allows users to see the first Token output by the Kimi large language model immediately, reducing wait time.
You can use streaming output in this way (stream=True) and get the streaming response:
from openai import OpenAI
client = OpenAI(
api_key = "MOONSHOT_API_KEY", # Replace MOONSHOT_API_KEY with the API Key you obtained from the Kimi Open Platform
base_url = "https://api.moonshot.ai/v1",
)
stream = client.chat.completions.create(
model = "kimi-k2.5",
messages = [
{"role": "system", "content": "You are Kimi, an artificial intelligence assistant provided by Moonshot AI, who is better at conversing in Chinese and English. You provide users with safe, helpful, and accurate answers. At the same time, you refuse to answer any questions related to terrorism, racism, pornography, and violence. Moonshot AI is a proper noun and should not be translated into other languages."},
{"role": "user", "content": "Hello, my name is Li Lei, what is 1+1?"}
],
stream=True, # <-- Note here, we enable streaming output mode by setting stream=True
)
# When streaming output mode is enabled (stream=True), the content returned by the SDK also changes. We no longer directly access the choice in the return value
# Instead, we access each individual chunk in the return value through a for loop
for chunk in stream:
# Here, the structure of each chunk is similar to the previous completion, but the message field is replaced with the delta field
delta = chunk.choices[0].delta # <-- The message field is replaced with the delta field
if delta.content:
# When printing the content, since it is streaming output, to ensure the coherence of the sentence, we do not add
# line breaks manually, so we set end="" to cancel the line break of print.
print(delta.content, end="")Common Issues When Using Streaming Output
Now that you have successfully run the above code and understood the basic principles of streaming output, let's discuss some details and common issues of streaming output so that you can better implement your business logic.
Interface Details
When streaming output mode is enabled (stream=True), the Kimi large language model no longer returns a response in JSON format (Content-Type: application/json), but uses Content-Type: text/event-stream (abbreviated as SSE). This response format allows the server to continuously send data to the client. In the context of using the Kimi large language model, it can be understood as the server continuously sending Tokens to the client.
When you look at the HTTP response body of SSE (opens in a new tab), it looks like this:
data: {"id":"cmpl-1305b94c570f447fbde3180560736287","object":"chat.completion.chunk","created":1698999575,"model":"kimi-k2.5","choices":[{"index":0,"delta":{"role":"assistant","content":""},"finish_reason":null}]}
data: {"id":"cmpl-1305b94c570f447fbde3180560736287","object":"chat.completion.chunk","created":1698999575,"model":"kimi-k2.5","choices":[{"index":0,"delta":{"content":"Hello"},"finish_reason":null}]}
...
data: {"id":"cmpl-1305b94c570f447fbde3180560736287","object":"chat.completion.chunk","created":1698999575,"model":"kimi-k2.5","choices":[{"index":0,"delta":{"content":"."},"finish_reason":null}]}
data: {"id":"cmpl-1305b94c570f447fbde3180560736287","object":"chat.completion.chunk","created":1698999575,"model":"kimi-k2.5","choices":[{"index":0,"delta":{},"finish_reason":"stop","usage":{"prompt_tokens":19,"completion_tokens":13,"total_tokens":32}}]}
data: [DONE]In the response body of SSE (opens in a new tab), we agree that each data chunk starts with data: , followed by a valid JSON object, and ends with two newline characters \n\n. Finally, when all data chunks have been transmitted, data: [DONE] is used to indicate that the transmission is complete, at which point the network connection can be disconnected.
Token Calculation
When using the streaming output mode, there are two ways to calculate tokens. The most straightforward and accurate method is to wait until all data chunks have been transmitted and then check the prompt_tokens, completion_tokens, and total_tokens in the usage field of the last data chunk.
...
data: {"id":"cmpl-1305b94c570f447fbde3180560736287","object":"chat.completion.chunk","created":1698999575,"model":"kimi-k2.5","choices":[{"index":0,"delta":{},"finish_reason":"stop","usage":{"prompt_tokens":19,"completion_tokens":13,"total_tokens":32}}]}
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
Check the number of tokens generated by the current request through the usage field in the last data chunk
data: [DONE]However, in practice, streaming output can be interrupted by uncontrollable factors such as network disconnections or client-side errors. In such cases, the last data chunk may not have been fully transmitted, making it impossible to know the total number of tokens consumed by the request. To avoid this issue, we recommend saving the content of each data chunk as it is received and then using the token calculation interface to compute the total consumption after the request ends, regardless of whether it was successful or not. Here is an example code snippet:
import os
import httpx
from openai import OpenAI
client = OpenAI(
api_key = "MOONSHOT_API_KEY", # Replace MOONSHOT_API_KEY with the API Key you obtained from the Kimi Open Platform
base_url = "https://api.moonshot.ai/v1",
)
stream = client.chat.completions.create(
model = "kimi-k2.5",
messages = [
{"role": "system", "content": "You are Kimi, an AI assistant provided by Moonshot AI, who excels in Chinese and English conversations. You provide users with safe, helpful, and accurate answers while rejecting any questions related to terrorism, racism, or explicit content. Moonshot AI is a proper noun and should not be translated."},
{"role": "user", "content": "Hello, my name is Li Lei. What is 1+1?"}
],
stream=True, # <-- Note here, we enable streaming output mode by setting stream=True
)
def estimate_token_count(input: str) -> int:
"""
Implement your token calculation logic here, or directly call our token calculation interface to compute tokens.
https://api.moonshot.ai/v1/tokenizers/estimate-token-count
"""
header = {
"Authorization": f"Bearer {os.environ['MOONSHOT_API_KEY']}",
}
data = {
"model": "kimi-k2.5",
"messages": [
{"role": "user", "content": input},
]
}
r = httpx.post("https://api.moonshot.ai/v1/tokenizers/estimate-token-count", headers=header, json=data)
r.raise_for_status()
return r.json()["data"]["total_tokens"]
completion = []
for chunk in stream:
delta = chunk.choices[0].delta
if delta.content:
completion.append(delta.content)
print("completion_tokens:", estimate_token_count("".join(completion)))How to Terminate Output
If you want to stop the streaming output, you can simply close the HTTP connection or discard any subsequent data chunks. For example:
for chunk in stream:
if condition:
breakHow to Handle Streaming Output Without Using an SDK
If you prefer not to use the Python SDK to handle streaming output and instead want to directly interface with HTTP APIs to use the Kimi large language model (for example, in cases where you are using a language without an SDK, or you have unique business logic that the SDK cannot meet), we provide some examples to help you understand how to properly handle the SSE (opens in a new tab) response body in HTTP (we still use Python code as an example here, with detailed explanations provided in comments).
import httpx # We use the httpx library to make our HTTP requests
data = {
"model": "kimi-k2.5",
"messages": [
# Specific messages
],
"stream": True,
}
# Use httpx to send a chat request to the Kimi large language model and get the response r
r = httpx.post("https://api.moonshot.ai/v1/chat/completions", json=data)
if r.status_code != 200:
raise Exception(r.text)
data: str
# Here, we use the iter_lines method to read the response body line by line
for line in r.iter_lines():
# Remove leading and trailing spaces from each line to better handle data chunks
line = line.strip()
# Next, we need to handle three different cases:
# 1. If the current line is empty, it indicates that the previous data chunk has been received (as mentioned earlier, the data chunk transmission ends with two newline characters), we can deserialize the data chunk and print the corresponding content;
# 2. If the current line is not empty and starts with data:, it indicates the start of a data chunk transmission, we remove the data: prefix and first check if it is the end symbol [DONE], if not, save the data content to the data variable;
# 3. If the current line is not empty but does not start with data:, it indicates that the current line still belongs to the previous data chunk being transmitted, we append the content of the current line to the end of the data variable;
if len(line) == 0:
chunk = json.loads(data)
# The processing logic here can be replaced with your business logic, printing is just to demonstrate the process
choice = chunk["choices"][0]
usage = choice.get("usage")
if usage:
print("total_tokens:", usage["total_tokens"])
delta = choice["delta"]
role = delta.get("role")
if role:
print("role:", role)
content = delta.get("content")
if content:
print(content, end="")
data = "" # Reset data
elif line.startswith("data: "):
data = line.lstrip("data: ")
# When the data chunk content is [DONE], it indicates that all data chunks have been sent, and the network connection can be disconnected
if data == "[DONE]":
break
else:
data = data + "\n" + line # We still add a newline character when appending content, as this data chunk may intentionally format the data in separate linesThe above is the process of handling streaming output using Python as an example. If you are using other languages, you can also properly handle the content of streaming output. The basic steps are as follows:
- Initiate an HTTP request and set the
streamparameter in the request body totrue; - Receive the response from the server. Note that if the
Content-Typein the responseHeadersistext/event-stream, it indicates that the response content is a streaming output; - Read the response content line by line and parse the data chunks (the data chunks are presented in JSON format). Pay attention to determining the start and end positions of the data chunks through the
data:prefix and newline character\n; - Determine whether the transmission is complete by checking if the current data chunk content is
[DONE];
Note: Always use data: [DONE] to determine if the data has been fully transmitted, rather than using finish_reason or other methods. If you do not receive the data: [DONE] message chunk, even if you have obtained the information finish_reason=stop, you should not consider the data chunk transmission as complete. In other words, until you receive the data: [DONE] data chunk, the message should be considered incomplete.
During the streaming output process, only the content field is streamed, meaning each data chunk contains a portion of the content tokens. For fields that do not need to be streamed, such as role and usage, we usually present them all at once in the first or last data chunk, rather than including the role and usage fields in every data chunk (specifically, the role field will only appear in the first data chunk and will not be included in subsequent data chunks; the usage field will only appear in the last data chunk and will not be included in the preceding data chunks).
Handling n>1
Sometimes, we want to get multiple results to choose from. To do this, you should set the n parameter in the request to a value greater than 1. When it comes to stream output, we also support the use of n>1. In such cases, we need to add some extra code to determine the index value of the current data block, to figure out which response the data block belongs to. Let's illustrate this with example code:
import httpx # We use the httpx library to make our HTTP requests
data = {
"model": "kimi-k2.5",
"messages": [
# Specific messages go here
],
"stream": True,
"n": 2, # <-- Note here, we're asking the Kimi large language model to output 2 responses
}
# Use httpx to send a chat request to the Kimi large language model and get the response r
r = httpx.post("https://api.moonshot.ai/v1/chat/completions", json=data)
if r.status_code != 200:
raise Exception(r.text)
data: str
# Here, we pre-build a List to store different response messages. Since we set n=2, we initialize the List with 2 elements
messages = [{}, {}]
# We use the iter_lines method here to read the response body line by line
for line in r.iter_lines():
# Remove leading and trailing spaces from each line to better handle data blocks
line = line.strip()
# Next, we need to handle three different scenarios:
# 1. If the current line is empty, it indicates that the previous data block has been fully received (as mentioned earlier, data block transmission ends with two newline characters). We can deserialize this data block and print out the corresponding content;
# 2. If the current line is not empty and starts with data:, it means the start of a data block transmission. After removing the data: prefix, we first check if it's the end marker [DONE]. If not, we save the data content to the data variable;
# 3. If the current line is not empty but doesn't start with data:, it means this line still belongs to the previous data block being transmitted. We append the content of this line to the end of the data variable;
if len(line) == 0:
chunk = json.loads(data)
# Loop through all choices in each data block to get the message object corresponding to the index
for choice in chunk["choices"]:
index = choice["index"]
message = messages[index]
usage = choice.get("usage")
if usage:
message["usage"] = usage
delta = choice["delta"]
role = delta.get("role")
if role:
message["role"] = role
content = delta.get("content")
if content:
message["content"] = message.get("content", "") + content
data = "" # Reset data
elif line.startswith("data: "):
data = line.lstrip("data: ")
# When the data block content is [DONE], it means all data blocks have been sent and we can disconnect the network
if data == "[DONE]":
break
else:
data = data + "\n" + line # When we're still appending content, we add a newline character because this might be the data block's intentional way of displaying data on separate lines
# After assembling all messages, we print their contents separately
for index, message in enumerate(messages):
print("index:", index)
print("message:", json.dumps(message, ensure_ascii=False))When n>1, the key to handling stream output is to first determine which response message the current data block belongs to based on its index value, and then proceed with further logical processing.