概述

本文将简单介绍如何利用Aliyun DNS API,编写脚本管理域名解析,利用Let’s Encrypt生成和自动续期https证书。

2023年3月20日更新,补充了GoDaddy相关api,重构管理脚本

Python环境和Aliyun SDK安装

Python虚拟环境配置

1
2
3
4
5
6
7
8
9
10
11
mkdir -p /opt/domain_manager
cd /opt/domain_manager

# 创建python沙盒,请先提前安装好python3环境,我这边使用的是Python 3.7.3
python3 -m venv venv

# 激活沙盒环境,如需退出激活模式,输入deactivate命令回车即可
source /opt/domain_manager/venv/bin/activate

# 更新pip工具
pip install -U pip setuptools

SDK安装

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
# requirements.txt
aliyun-python-sdk-alidns==3.0.1
aliyun-python-sdk-core==2.13.36
aliyunsdkcore==1.0.3
certifi==2022.12.7
cffi==1.15.1
charset-normalizer==2.0.12
cryptography==39.0.1
GoDaddyPy==2.3.4
idna==3.4
jmespath==0.10.0
pycparser==2.21
pycrypto==2.6.1
requests==2.27.1
urllib3==1.26.14

# 安装依赖
pip install -r requirements.txt

域名管理脚本

我自己写了个脚本封装了一下sdk,并以命令行参数的形式发布使用,读者可以根据需求自行修改或编写。凭证相关的信息请使用Credentials对象处理。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
# -*- coding: UTF-8 -*-
# Date:'2022/7/29 23:09'
__author__ = 'imaginefei'

import sys
import json
from godaddypy import Client as GoDaddyClient
from godaddypy import Account as GoDaddyAccount
from abc import ABC, abstractmethod
from optparse import OptionParser
from aliyunsdkcore.client import AcsClient
from aliyunsdkalidns.request.v20150109.AddDomainRecordRequest import AddDomainRecordRequest
from aliyunsdkalidns.request.v20150109.DescribeDomainRecordsRequest import DescribeDomainRecordsRequest
from aliyunsdkalidns.request.v20150109.DeleteDomainRecordRequest import DeleteDomainRecordRequest
from aliyunsdkalidns.request.v20150109.UpdateDomainRecordRequest import UpdateDomainRecordRequest


class Credentials:
def __init__(self, key, secret, region=""):
self.key = key
self.secret = secret
self.region = region


class IDomainMgr(ABC):
"""
域名管理器抽象接口
"""

@abstractmethod
def get_domain_records(self, domain, subdomain, record_type):
pass

@abstractmethod
def add_domain_record(self, domain, subdomain, record_type, record_value, ttl):
pass

@abstractmethod
def update_domain_record(self, domain, subdomain, record_type, record_value, ttl=600):
pass

@abstractmethod
def delete_domain_record(self, domain, subdomain, record_type):
pass


class AliyunDomainMgr(IDomainMgr):
"""
Aliyun域名管理器
"""

def __init__(self, credentials):
self.client = AcsClient(credentials.key, credentials.secret, credentials.region)

@staticmethod
def bytes_to_str(bytes_str):
return str(bytes_str, encoding="utf-8")

def get_domain_records(self, domain, subdomain, record_type):
"""
获取子域名信息
:param domain: 域名
:param subdomain: 子域名
:param record_type: 域名记录类型,如:A/NS/MX/TXT/CNAME/SRV/AAAA/CAA/REDIRECT_URL/FORWARD_URL
:return: json
"""
req = DescribeDomainRecordsRequest()
req.set_accept_format("json")
req.set_DomainName(domain)
req.set_RRKeyWord(subdomain)
req.set_Type(record_type)
return self.bytes_to_str(self.client.do_action_with_exception(req))

def get_domain_record_ids(self, domain, subdomain, record_type):
"""
获取子域名记录id
:param domain: 域名
:param subdomain: 子域名
:param record_type: 域名记录类型,如:A/NS/MX/TXT/CNAME/SRV/AAAA/CAA/REDIRECT_URL/FORWARD_URL
:return: 记录id列表
"""
resp = json.loads(self.get_domain_records(domain, subdomain, record_type))
record_ids = list()
for record in resp["DomainRecords"]["Record"]:
if record.get("RR") == subdomain:
record_ids.append(record.get("RecordId"))

return record_ids

def get_domain_record_id(self, domain, subdomain, record_type, record_value):
"""
获取子域名记录id
:param domain: 域名
:param subdomain: 子域名
:param record_type: 域名记录类型,如:A/NS/MX/TXT/CNAME/SRV/AAAA/CAA/REDIRECT_URL/FORWARD_URL
:param record_value: 域名记录
:return: 记录id
"""
resp = json.loads(self.get_domain_records(domain, subdomain, record_type))
record_id = ""
for record in resp["DomainRecords"]["Record"]:
if record.get("RR") == subdomain and record.get("Value") == record_value:
record_id = record.get("RecordId")

return record_id

def add_domain_record(self, domain, subdomain, record_type, record_value, ttl=600):
"""
添加子域名
:param domain: 域名
:param subdomain: 子域名
:param record_type: 域名记录类型
:param record_value: 域名记录
:param ttl: ttl
:return: json
"""
req = AddDomainRecordRequest()
req.set_accept_format("json")
req.set_DomainName(domain)
req.set_RR(subdomain)
req.set_Type(record_type)
req.set_Value(record_value)
req.set_TTL(int(ttl))
return self.bytes_to_str(self.client.do_action_with_exception(req))

def delete_domain_record(self, domain, subdomain, record_type):
"""
删除子域名
:param domain: 域名
:param subdomain: 子域名
:param record_type: 类型
"""
r_ids = self.get_domain_record_ids(domain, subdomain, record_type)

for rid in r_ids:
req = DeleteDomainRecordRequest()
req.set_accept_format("json")
req.set_RecordId(rid)
print(self.bytes_to_str(self.client.do_action_with_exception(req)))

def update_domain_record(self, domain, subdomain, record_type, record_value, ttl=600):
record_ids = self.get_domain_record_ids(domain, subdomain, record_type)
for rid in record_ids:
req = UpdateDomainRecordRequest()
req.set_accept_format("json")
req.set_RecordId(rid)
req.set_RR(subdomain)
req.set_Type(record_type)
req.set_Value(record_value)
req.set_TTL(ttl)

print(self.bytes_to_str(self.client.do_action_with_exception(req)))


class GodaddyDomainMgr(IDomainMgr):
def __init__(self, credentials):
go_account = GoDaddyAccount(api_key=credentials.key, api_secret=credentials.secret)
self.client = GoDaddyClient(go_account)

def get_domain_records(self, domain, subdomain, record_type):
return json.dumps(self.client.get_records(domain, record_type, subdomain))

def add_domain_record(self, domain, subdomain, record_type, record_value, ttl):
return json.dumps(
self.client.add_record(domain, {'data': record_value, 'name': subdomain, 'ttl': ttl, 'type': record_type}))

def update_domain_record(self, domain, subdomain, record_type, record_value, ttl=600):
print(self.client.update_record(domain,
{'data': record_value, 'name': subdomain, 'ttl': ttl, 'type': record_type}))

def delete_domain_record(self, domain, subdomain, record_type):
print(self.client.delete_records(domain, subdomain, record_type))


class DomainMgr:
"""
总域名管理器
"""

def __init__(self, mgr):
self.mgr = mgr

def get_domain_records(self, domain, subdomain, record_type):
return self.mgr.get_domain_records(domain, subdomain, record_type)

def add_domain_record(self, domain, subdomain, record_type, record_value, ttl=600):
return self.mgr.add_domain_record(domain, subdomain, record_type, record_value, ttl)

def update_domain_record(self, domain, subdomain, record_type, record_value, ttl=600):
return self.mgr.update_domain_record(domain, subdomain, record_type, record_value, ttl)

def delete_domain_record(self, domain, subdomain, record_type):
self.mgr.delete_domain_record(domain, subdomain, record_type)


if __name__ == '__main__':
# 解析命令行参数
# parser = OptionParser(usage="%prog [-i] [-o]",
# version="%prog 1.0",
# description="A time processing tools for IDP learning")
parser = OptionParser()
parser.add_option("-r", "--registrar", action="store", type="str", dest="registrar",
help="域名注册商:aliyun/godaddy")
parser.add_option("-a", "--action", action="store", type="str", dest="action", help="动作:check/add/update/delete")
parser.add_option("-d", "--domain", action="store", type="str", dest="domain", help="域名")
parser.add_option("-s", "--subdomain", action="store", type="str", dest="sub_domain", help="子域名")
parser.add_option("-t", "--type", action="store", type="str", dest="sub_domain_type",
help="子域名记录类型:A/NS/MX/TXT/CNAME/SRV/AAAA/CAA/REDIRECT_URL/FORWARD_URL")
parser.add_option("-v", "--value", action="store", type="str", dest="sub_domain_value", help="子域名记录值")
(options, args) = parser.parse_args()

registrar = options.registrar
action = options.action
my_domain = options.domain
my_sub_domain = options.sub_domain
my_sub_domain_type = options.sub_domain_type
my_sub_domain_value = options.sub_domain_value

# 类型
domain_mgr = None
if registrar == "aliyun":
my_access_key = "xxxxxxxxxxxx"
my_access_secret = "xxxxxxxxxxxx"
REGION = "cn-hangzhou"
my_credit = Credentials(my_access_key, my_access_secret, REGION)
ali_mgr = AliyunDomainMgr(my_credit)
domain_mgr = DomainMgr(ali_mgr)
elif registrar == "godaddy":
my_access_key = "xxxxxxxxxxxx"
my_access_secret = "xxxxxxxxxxxx"
my_credit = Credentials(my_access_key, my_access_secret)
godaddy_mgr = GodaddyDomainMgr(my_credit)
domain_mgr = DomainMgr(godaddy_mgr)

if action == "add":
print(domain_mgr.add_domain_record(my_domain, my_sub_domain, my_sub_domain_type, my_sub_domain_value))
elif action == "delete":
domain_mgr.delete_domain_record(my_domain, my_sub_domain, my_sub_domain_type)
elif action == "check":
print(domain_mgr.get_domain_records(my_domain, my_sub_domain, my_sub_domain_type))
elif action == "update":
domain_mgr.update_domain_record(my_domain, my_sub_domain, my_sub_domain_type, my_sub_domain_value)
else:
print("无法识别操作类型{0}.".format(options.get("action")))
sys.exit(1)

脚本使用样例:

1
2
3
4
5
6
7
8
9
10
11
# 查询子域名信息
python domain.py -a check -d yourdomain.com -s yoursubdomain -t TXT

# 新增子域名
python domain.py -a add -d yourdomain.com -s yoursubdomain -t TXT -v yourvalue

# 更新子域名
python domain.py -a update -d yourdomain.com -s yoursubdomain -t TXT -v yourvalue

# 删除子域名
python domain.py -a delete -d yourdomain.com -s yoursubdomain -t TXT -v yourvalue

Let’s Encrypt管理脚本

官网地址:https://letsencrypt.org/

官方使用文档:https://eff-certbot.readthedocs.io/en/stable/using.html

请自行按文档安装certbot命令!

manual-auth-hook脚本样例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
#!/bin/sh

ACME="_acme-challenge"
PYTHON_PATH="/opt/domain_mgr/venv/bin/python"
DOMAIN_SCRIPT_BASE="/opt/domain_mgr"

${PYTHON_PATH} ${DOMAIN_SCRIPT_BASE}/domain.py -r godaddy -a add -d ${CERTBOT_DOMAIN} -s ${ACME} -t TXT -v ${CERTBOT_VALIDATION}
if [ $? -eq 0 ]
then
echo "域名${CERTBOT_DOMAIN}的子域名${ACME}添加成功."

if [ ${CERTBOT_REMAINING_CHALLENGES} -gt 0 ]
then
exit 0
fi
else
echo "域名${CERTBOT_DOMAIN}的子域名${ACME}添加失败."
exit 1
fi

# 休眠30秒
sleep 30

for i in `seq 4`
do
acme_value=`dig @114.114.114.114 -t txt ${ACME}.${CERTBOT_DOMAIN} +nocomments +noquestion +noauthority +noadditional +nostats | awk '{if (NR>3){print}}' | awk '{print $5}' | sed "s/\"//g"`

match_result=`echo ${acme_value} | grep "${CERTBOT_VALIDATION}"`

if [ -n "${match_result}" ];
then
echo "检测到${CERTBOT_VALIDATION}在${ACME}.${CERTBOT_DOMAIN}的TXT记录中."
exit 0
else
if [ $i -eq 4 ]
then
echo "已尝试3次,未检测到${CERTBOT_VALIDATION}在${ACME}.${CERTBOT_DOMAIN}的TXT记录中,退出."
exit 1
fi

echo "未检测到${CERTBOT_VALIDATION}在${ACME}.${CERTBOT_DOMAIN}的TXT记录中,重试."
echo "${ACME}.${CERTBOT_DOMAIN}的TXT记录:"
echo ${acme_value}
echo ""
sleep 30
continue
fi
done

manual-cleanup-hook脚本样例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
#!/bin/sh

ACME="_acme-challenge"
PYTHON_PATH="/opt/domain_mgr/venv/bin/python"
DOMAIN_SCRIPT_BASE="/opt/domain_mgr"

echo "清理${ACME}.${CERTBOT_DOMAIN}..."
${PYTHON_PATH} ${DOMAIN_SCRIPT_BASE}/domain.py -r godaddy -a delete -d ${CERTBOT_DOMAIN} -s ${ACME} -t TXT -v ${CERTBOT_VALIDATION}
if [ $? -eq 0 ]
then
echo "清理成功."
exit 0
else
echo "清理失败."
exit 1
fi

生成证书和自动更新证书

1
2
3
4
5
# 生成通配符证书
certbot certonly --agree-tos -m yourdomain.com --manual --preferred-challenges dns -d yourdomain.com -d *.yourdomain.com --manual-auth-hook /opt/domain_manager/certbot_renew.sh --manual-cleanup-hook /opt/domain_manager/certbot_clear.sh

# 更新证书
certbot renew --agree-tos -m yourdomain.com --cert-name yourdomain.com --manual --preferred-challenges dns --manual-auth-hook /opt/domain_manager/certbot_renew.sh --manual-cleanup-hook /opt/domain_manager/certbot_clear.sh

定时更新证书

将自动更新证书的命令添加到crontab即可:

1
2
3
# 每月1号更新
crontab -e
0 0 1 * * certbot renew --agree-tos -m yourdomain.com --cert-name yourdomain.com --manual --preferred-challenges dns --manual-auth-hook /opt/domain_manager/certbot_renew.sh --manual-cleanup-hook /opt/domain_manager/certbot_clear.sh | tee /tmp/letencrypt.log